1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include <arrow/test-util.h>
21
22#include "parquet/column_reader.h"
23#include "parquet/column_writer.h"
24#include "parquet/test-specialization.h"
25#include "parquet/test-util.h"
26#include "parquet/thrift.h"
27#include "parquet/types.h"
28#include "parquet/util/comparison.h"
29#include "parquet/util/memory.h"
30
31namespace parquet {
32
33using schema::GroupNode;
34using schema::NodePtr;
35using schema::PrimitiveNode;
36
37namespace test {
38
39// The default size used in most tests.
40const int SMALL_SIZE = 100;
41#ifdef PARQUET_VALGRIND
42// Larger size to test some corner cases, only used in some specific cases.
43const int LARGE_SIZE = 10000;
44// Very large size to test dictionary fallback.
45const int VERY_LARGE_SIZE = 40000;
46// Reduced dictionary page size to use for testing dictionary fallback with valgrind
47const int64_t DICTIONARY_PAGE_SIZE = 1024;
48#else
49// Larger size to test some corner cases, only used in some specific cases.
50const int LARGE_SIZE = 100000;
51// Very large size to test dictionary fallback.
52const int VERY_LARGE_SIZE = 400000;
53// Dictionary page size to use for testing dictionary fallback
54const int64_t DICTIONARY_PAGE_SIZE = 1024 * 1024;
55#endif
56
57template <typename TestType>
58class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
59 public:
60 typedef typename TestType::c_type T;
61
62 void SetUp() {
63 this->SetupValuesOut(SMALL_SIZE);
64 writer_properties_ = default_writer_properties();
65 definition_levels_out_.resize(SMALL_SIZE);
66 repetition_levels_out_.resize(SMALL_SIZE);
67
68 this->SetUpSchema(Repetition::REQUIRED);
69
70 descr_ = this->schema_.Column(0);
71 }
72
73 Type::type type_num() { return TestType::type_num; }
74
75 void BuildReader(int64_t num_rows,
76 Compression::type compression = Compression::UNCOMPRESSED) {
77 auto buffer = sink_->GetBuffer();
78 std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
79 std::unique_ptr<PageReader> page_reader =
80 PageReader::Open(std::move(source), num_rows, compression);
81 reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
82 }
83
84 std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
85 int64_t output_size = SMALL_SIZE,
86 const ColumnProperties& column_properties = ColumnProperties(),
87 const ParquetVersion::type version = ParquetVersion::PARQUET_1_0) {
88 sink_.reset(new InMemoryOutputStream());
89 WriterProperties::Builder wp_builder;
90 wp_builder.version(version);
91 if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
92 column_properties.encoding() == Encoding::RLE_DICTIONARY) {
93 wp_builder.enable_dictionary();
94 wp_builder.dictionary_pagesize_limit(DICTIONARY_PAGE_SIZE);
95 } else {
96 wp_builder.disable_dictionary();
97 wp_builder.encoding(column_properties.encoding());
98 }
99 wp_builder.max_statistics_size(column_properties.max_statistics_size());
100 writer_properties_ = wp_builder.build();
101
102 metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_);
103 std::unique_ptr<PageWriter> pager =
104 PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get());
105 std::shared_ptr<ColumnWriter> writer =
106 ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
107 return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
108 }
109
110 void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
111 BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
112 reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
113 definition_levels_out_.data(), repetition_levels_out_.data(),
114 this->values_out_ptr_, &values_read_);
115 this->SyncValuesOut();
116 }
117
118 void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
119
120 void TestRequiredWithEncoding(Encoding::type encoding) {
121 return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
122 }
123
124 void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
125 bool enable_dictionary, bool enable_statistics,
126 int64_t num_rows = SMALL_SIZE) {
127 this->GenerateData(num_rows);
128
129 this->WriteRequiredWithSettings(encoding, compression, enable_dictionary,
130 enable_statistics, num_rows);
131 ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
132
133 this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary,
134 enable_statistics, num_rows);
135 ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows));
136 }
137
138 void TestDictionaryFallbackEncoding(ParquetVersion::type version) {
139 this->GenerateData(VERY_LARGE_SIZE);
140 ColumnProperties column_properties;
141 column_properties.set_dictionary_enabled(true);
142
143 if (version == ParquetVersion::PARQUET_1_0) {
144 column_properties.set_encoding(Encoding::PLAIN_DICTIONARY);
145 } else {
146 column_properties.set_encoding(Encoding::RLE_DICTIONARY);
147 }
148
149 auto writer = this->BuildWriter(VERY_LARGE_SIZE, column_properties, version);
150
151 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
152 writer->Close();
153
154 // Read all rows so we are sure that also the non-dictionary pages are read correctly
155 this->SetupValuesOut(VERY_LARGE_SIZE);
156 this->ReadColumnFully();
157 ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
158 this->values_.resize(VERY_LARGE_SIZE);
159 ASSERT_EQ(this->values_, this->values_out_);
160 std::vector<Encoding::type> encodings = this->metadata_encodings();
161
162 if (this->type_num() == Type::BOOLEAN) {
163 // Dictionary encoding is not allowed for boolean type
164 // There are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
165 std::vector<Encoding::type> expected({Encoding::PLAIN, Encoding::RLE});
166 ASSERT_EQ(encodings, expected);
167 } else if (version == ParquetVersion::PARQUET_1_0) {
168 // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case
169 // for version 1.0
170 std::vector<Encoding::type> expected(
171 {Encoding::PLAIN_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
172 ASSERT_EQ(encodings, expected);
173 } else {
174 // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for
175 // version 2.0
176 std::vector<Encoding::type> expected(
177 {Encoding::RLE_DICTIONARY, Encoding::PLAIN, Encoding::RLE, Encoding::PLAIN});
178 ASSERT_EQ(encodings, expected);
179 }
180 }
181
182 void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
183 bool enable_dictionary, bool enable_statistics,
184 int64_t num_rows) {
185 ColumnProperties column_properties(encoding, compression, enable_dictionary,
186 enable_statistics);
187 std::shared_ptr<TypedColumnWriter<TestType>> writer =
188 this->BuildWriter(num_rows, column_properties);
189 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
190 // The behaviour should be independent from the number of Close() calls
191 writer->Close();
192 writer->Close();
193 }
194
195 void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
196 Compression::type compression,
197 bool enable_dictionary, bool enable_statistics,
198 int64_t num_rows) {
199 std::vector<uint8_t> valid_bits(
200 BitUtil::BytesForBits(static_cast<uint32_t>(this->values_.size())) + 1, 255);
201 ColumnProperties column_properties(encoding, compression, enable_dictionary,
202 enable_statistics);
203 std::shared_ptr<TypedColumnWriter<TestType>> writer =
204 this->BuildWriter(num_rows, column_properties);
205 writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0,
206 this->values_ptr_);
207 // The behaviour should be independent from the number of Close() calls
208 writer->Close();
209 writer->Close();
210 }
211
212 void ReadAndCompare(Compression::type compression, int64_t num_rows) {
213 this->SetupValuesOut(num_rows);
214 this->ReadColumnFully(compression);
215 std::shared_ptr<CompareDefault<TestType>> compare;
216 compare = std::static_pointer_cast<CompareDefault<TestType>>(
217 Comparator::Make(this->descr_));
218 for (size_t i = 0; i < this->values_.size(); i++) {
219 if ((*compare)(this->values_[i], this->values_out_[i]) ||
220 (*compare)(this->values_out_[i], this->values_[i])) {
221 std::cout << "Failed at " << i << std::endl;
222 }
223 ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
224 ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
225 }
226 ASSERT_EQ(this->values_, this->values_out_);
227 }
228
229 int64_t metadata_num_values() {
230 // Metadata accessor must be created lazily.
231 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
232 // complete (no changes to the metadata buffer can be made after instantiation)
233 auto metadata_accessor =
234 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
235 return metadata_accessor->num_values();
236 }
237
238 bool metadata_is_stats_set() {
239 // Metadata accessor must be created lazily.
240 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
241 // complete (no changes to the metadata buffer can be made after instantiation)
242 ApplicationVersion app_version(this->writer_properties_->created_by());
243 auto metadata_accessor =
244 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
245 return metadata_accessor->is_stats_set();
246 }
247
248 std::vector<Encoding::type> metadata_encodings() {
249 // Metadata accessor must be created lazily.
250 // This is because the ColumnChunkMetaData semantics dictate the metadata object is
251 // complete (no changes to the metadata buffer can be made after instantiation)
252 auto metadata_accessor =
253 ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
254 return metadata_accessor->encodings();
255 }
256
257 protected:
258 int64_t values_read_;
259 // Keep the reader alive as for ByteArray the lifetime of the ByteArray
260 // content is bound to the reader.
261 std::unique_ptr<TypedColumnReader<TestType>> reader_;
262
263 std::vector<int16_t> definition_levels_out_;
264 std::vector<int16_t> repetition_levels_out_;
265
266 const ColumnDescriptor* descr_;
267
268 private:
269 std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
270 std::unique_ptr<InMemoryOutputStream> sink_;
271 std::shared_ptr<WriterProperties> writer_properties_;
272 std::vector<std::vector<uint8_t>> data_buffer_;
273};
274
275template <typename TestType>
276void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
277 int64_t total_values = static_cast<int64_t>(this->values_out_.size());
278 BuildReader(total_values, compression);
279 values_read_ = 0;
280 while (values_read_ < total_values) {
281 int64_t values_read_recently = 0;
282 reader_->ReadBatch(
283 static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
284 definition_levels_out_.data() + values_read_,
285 repetition_levels_out_.data() + values_read_,
286 this->values_out_ptr_ + values_read_, &values_read_recently);
287 values_read_ += values_read_recently;
288 }
289 this->SyncValuesOut();
290}
291
292template <>
293void TestPrimitiveWriter<Int96Type>::ReadAndCompare(Compression::type compression,
294 int64_t num_rows) {
295 this->SetupValuesOut(num_rows);
296 this->ReadColumnFully(compression);
297 std::shared_ptr<CompareDefault<Int96Type>> compare;
298 compare = std::make_shared<CompareDefaultInt96>();
299 for (size_t i = 0; i < this->values_.size(); i++) {
300 if ((*compare)(this->values_[i], this->values_out_[i]) ||
301 (*compare)(this->values_out_[i], this->values_[i])) {
302 std::cout << "Failed at " << i << std::endl;
303 }
304 ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
305 ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
306 }
307 ASSERT_EQ(this->values_, this->values_out_);
308}
309
310template <>
311void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
312 int64_t total_values = static_cast<int64_t>(this->values_out_.size());
313 BuildReader(total_values, compression);
314 this->data_buffer_.clear();
315
316 values_read_ = 0;
317 while (values_read_ < total_values) {
318 int64_t values_read_recently = 0;
319 reader_->ReadBatch(
320 static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
321 definition_levels_out_.data() + values_read_,
322 repetition_levels_out_.data() + values_read_,
323 this->values_out_ptr_ + values_read_, &values_read_recently);
324
325 // Copy contents of the pointers
326 std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
327 uint8_t* data_ptr = data.data();
328 for (int64_t i = 0; i < values_read_recently; i++) {
329 memcpy(data_ptr + this->descr_->type_length() * i,
330 this->values_out_[i + values_read_].ptr, this->descr_->type_length());
331 this->values_out_[i + values_read_].ptr =
332 data_ptr + this->descr_->type_length() * i;
333 }
334 data_buffer_.emplace_back(std::move(data));
335
336 values_read_ += values_read_recently;
337 }
338 this->SyncValuesOut();
339}
340
341typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
342 BooleanType, ByteArrayType, FLBAType>
343 TestTypes;
344
345TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
346
347using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
348
349TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
350 this->TestRequiredWithEncoding(Encoding::PLAIN);
351}
352
353TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
354 this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
355}
356
357/*
358TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
359 this->TestRequiredWithEncoding(Encoding::RLE);
360}
361
362TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
363 this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
364}
365
366TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
367 this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
368}
369
370TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
371 this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
372}
373
374TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
375 this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
376}
377
378TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
379 this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
380}
381*/
382
383TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
384 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false,
385 LARGE_SIZE);
386}
387
388TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
389 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
390 LARGE_SIZE);
391}
392
393TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
394 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
395 LARGE_SIZE);
396}
397
398TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) {
399 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false,
400 LARGE_SIZE);
401}
402
403TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
404 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
405 LARGE_SIZE);
406}
407
408TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
409 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true,
410 LARGE_SIZE);
411}
412
413TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
414 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true,
415 LARGE_SIZE);
416}
417
418TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
419 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true,
420 LARGE_SIZE);
421}
422
423TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) {
424 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true,
425 LARGE_SIZE);
426}
427
428// The ExternalProject for zstd does not build on CMake < 3.7, so we do not
429// require it here
430#ifdef ARROW_WITH_ZSTD
431TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) {
432 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false,
433 LARGE_SIZE);
434}
435
436TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) {
437 this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true,
438 LARGE_SIZE);
439}
440#endif
441
442TYPED_TEST(TestPrimitiveWriter, Optional) {
443 // Optional and non-repeated, with definition levels
444 // but no repetition levels
445 this->SetUpSchema(Repetition::OPTIONAL);
446
447 this->GenerateData(SMALL_SIZE);
448 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
449 definition_levels[1] = 0;
450
451 auto writer = this->BuildWriter();
452 writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
453 this->values_ptr_);
454 writer->Close();
455
456 // PARQUET-703
457 ASSERT_EQ(100, this->metadata_num_values());
458
459 this->ReadColumn();
460 ASSERT_EQ(99, this->values_read_);
461 this->values_out_.resize(99);
462 this->values_.resize(99);
463 ASSERT_EQ(this->values_, this->values_out_);
464}
465
466TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
467 // Optional and non-repeated, with definition levels
468 // but no repetition levels
469 this->SetUpSchema(Repetition::OPTIONAL);
470
471 this->GenerateData(SMALL_SIZE);
472 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
473 std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
474
475 definition_levels[SMALL_SIZE - 1] = 0;
476 ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
477 definition_levels[1] = 0;
478 ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
479
480 auto writer = this->BuildWriter();
481 writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
482 valid_bits.data(), 0, this->values_ptr_);
483 writer->Close();
484
485 // PARQUET-703
486 ASSERT_EQ(100, this->metadata_num_values());
487
488 this->ReadColumn();
489 ASSERT_EQ(98, this->values_read_);
490 this->values_out_.resize(98);
491 this->values_.resize(99);
492 this->values_.erase(this->values_.begin() + 1);
493 ASSERT_EQ(this->values_, this->values_out_);
494}
495
496TYPED_TEST(TestPrimitiveWriter, Repeated) {
497 // Optional and repeated, so definition and repetition levels
498 this->SetUpSchema(Repetition::REPEATED);
499
500 this->GenerateData(SMALL_SIZE);
501 std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
502 definition_levels[1] = 0;
503 std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
504
505 auto writer = this->BuildWriter();
506 writer->WriteBatch(this->values_.size(), definition_levels.data(),
507 repetition_levels.data(), this->values_ptr_);
508 writer->Close();
509
510 this->ReadColumn();
511 ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
512 this->values_out_.resize(SMALL_SIZE - 1);
513 this->values_.resize(SMALL_SIZE - 1);
514 ASSERT_EQ(this->values_, this->values_out_);
515}
516
517TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
518 this->GenerateData(LARGE_SIZE);
519
520 // Test case 1: required and non-repeated, so no definition or repetition levels
521 auto writer = this->BuildWriter(LARGE_SIZE);
522 writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
523 writer->Close();
524
525 // Just read the first SMALL_SIZE rows to ensure we could read it back in
526 this->ReadColumn();
527 ASSERT_EQ(SMALL_SIZE, this->values_read_);
528 this->values_.resize(SMALL_SIZE);
529 ASSERT_EQ(this->values_, this->values_out_);
530}
531
532// Test cases for dictionary fallback encoding
533TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
534 this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_1_0);
535}
536
537TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
538 this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
539}
540
541// PARQUET-719
542// Test case for NULL values
543TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
544 this->SetUpSchema(Repetition::OPTIONAL);
545
546 this->GenerateData(LARGE_SIZE);
547
548 std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
549 std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
550
551 auto writer = this->BuildWriter(LARGE_SIZE);
552 // All values being written are NULL
553 writer->WriteBatch(this->values_.size(), definition_levels.data(),
554 repetition_levels.data(), nullptr);
555 writer->Close();
556
557 // Just read the first SMALL_SIZE rows to ensure we could read it back in
558 this->ReadColumn();
559 ASSERT_EQ(0, this->values_read_);
560}
561
562// PARQUET-764
563// Correct bitpacking for boolean write at non-byte boundaries
564using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
565TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
566 this->SetUpSchema(Repetition::REQUIRED);
567 auto writer = this->BuildWriter();
568 for (int i = 0; i < SMALL_SIZE; i++) {
569 bool value = (i % 2 == 0) ? true : false;
570 writer->WriteBatch(1, nullptr, nullptr, &value);
571 }
572 writer->Close();
573 this->ReadColumn();
574 for (int i = 0; i < SMALL_SIZE; i++) {
575 ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
576 }
577}
578
579// PARQUET-979
580// Prevent writing large stats
581using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
582TEST_F(TestByteArrayValuesWriter, OmitStats) {
583 int min_len = 1024 * 4;
584 int max_len = 1024 * 8;
585 this->SetUpSchema(Repetition::REQUIRED);
586 auto writer = this->BuildWriter();
587
588 values_.resize(SMALL_SIZE);
589 InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
590 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
591 writer->Close();
592
593 ASSERT_FALSE(this->metadata_is_stats_set());
594}
595
596TEST_F(TestByteArrayValuesWriter, LimitStats) {
597 int min_len = 1024 * 4;
598 int max_len = 1024 * 8;
599 this->SetUpSchema(Repetition::REQUIRED);
600 ColumnProperties column_properties;
601 column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
602 auto writer = this->BuildWriter(SMALL_SIZE, column_properties);
603
604 values_.resize(SMALL_SIZE);
605 InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
606 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
607 writer->Close();
608
609 ASSERT_TRUE(this->metadata_is_stats_set());
610}
611
612TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
613 this->SetUpSchema(Repetition::REQUIRED);
614 auto writer = this->BuildWriter();
615 this->GenerateData(SMALL_SIZE);
616
617 writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
618 writer->Close();
619
620 ASSERT_TRUE(this->metadata_is_stats_set());
621}
622
623TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) {
624 // In ARROW-3930 we discovered a bug when writing from Arrow when we had data
625 // that looks like this:
626 //
627 // [null, [0, 1, null, 2, 3, 4, null]]
628
629 // Create schema
630 NodePtr item = schema::Int32("item"); // optional item
631 NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST));
632 NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list})); // optional list
633 std::vector<NodePtr> fields = {bag};
634 NodePtr root = GroupNode::Make("schema", Repetition::REPEATED, fields);
635
636 SchemaDescriptor schema;
637 schema.Init(root);
638
639 InMemoryOutputStream sink;
640 auto props = WriterProperties::Builder().build();
641
642 auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0));
643 std::unique_ptr<PageWriter> pager =
644 PageWriter::Open(&sink, Compression::UNCOMPRESSED, metadata.get());
645 std::shared_ptr<ColumnWriter> writer =
646 ColumnWriter::Make(metadata.get(), std::move(pager), props.get());
647 auto typed_writer = std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer);
648
649 std::vector<int16_t> def_levels = {1, 3, 3, 2, 3, 3, 3, 2, 3, 3, 3, 2, 3, 3};
650 std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
651 std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
652
653 // Write the values into uninitialized memory
654 std::shared_ptr<Buffer> values_buffer;
655 ASSERT_OK(::arrow::AllocateBuffer(64, &values_buffer));
656 memcpy(values_buffer->mutable_data(), values.data(), 13 * sizeof(int32_t));
657 auto values_data = reinterpret_cast<const int32_t*>(values_buffer->data());
658
659 std::shared_ptr<Buffer> valid_bits;
660 ASSERT_OK(::arrow::BitUtil::BytesToBits({1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1},
661 ::arrow::default_memory_pool(), &valid_bits));
662
663 // valgrind will warn about out of bounds access into def_levels_data
664 typed_writer->WriteBatchSpaced(14, def_levels.data(), rep_levels.data(),
665 valid_bits->data(), 0, values_data);
666 writer->Close();
667}
668
669void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
670 std::vector<int16_t>& input_levels) {
671 // for each repetition count upto max_repeat_factor
672 for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
673 // repeat count increases by a factor of 2 for every iteration
674 int repeat_count = (1 << repeat);
675 // generate levels for repetition count upto the maximum level
676 int16_t value = 0;
677 int bwidth = 0;
678 while (value <= max_level) {
679 for (int i = 0; i < repeat_count; i++) {
680 input_levels.push_back(value);
681 }
682 value = static_cast<int16_t>((2 << bwidth) - 1);
683 bwidth++;
684 }
685 }
686}
687
688void EncodeLevels(Encoding::type encoding, int16_t max_level, int num_levels,
689 const int16_t* input_levels, std::vector<uint8_t>& bytes) {
690 LevelEncoder encoder;
691 int levels_count = 0;
692 bytes.resize(2 * num_levels);
693 ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
694 // encode levels
695 if (encoding == Encoding::RLE) {
696 // leave space to write the rle length value
697 encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
698 static_cast<int>(bytes.size()));
699
700 levels_count = encoder.Encode(num_levels, input_levels);
701 (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
702 } else {
703 encoder.Init(encoding, max_level, num_levels, bytes.data(),
704 static_cast<int>(bytes.size()));
705 levels_count = encoder.Encode(num_levels, input_levels);
706 }
707 ASSERT_EQ(num_levels, levels_count);
708}
709
710void VerifyDecodingLevels(Encoding::type encoding, int16_t max_level,
711 std::vector<int16_t>& input_levels,
712 std::vector<uint8_t>& bytes) {
713 LevelDecoder decoder;
714 int levels_count = 0;
715 std::vector<int16_t> output_levels;
716 int num_levels = static_cast<int>(input_levels.size());
717
718 output_levels.resize(num_levels);
719 ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
720
721 // Decode levels and test with multiple decode calls
722 decoder.SetData(encoding, max_level, num_levels, bytes.data());
723 int decode_count = 4;
724 int num_inner_levels = num_levels / decode_count;
725 // Try multiple decoding on a single SetData call
726 for (int ct = 0; ct < decode_count; ct++) {
727 int offset = ct * num_inner_levels;
728 levels_count = decoder.Decode(num_inner_levels, output_levels.data());
729 ASSERT_EQ(num_inner_levels, levels_count);
730 for (int i = 0; i < num_inner_levels; i++) {
731 EXPECT_EQ(input_levels[i + offset], output_levels[i]);
732 }
733 }
734 // check the remaining levels
735 int num_levels_completed = decode_count * (num_levels / decode_count);
736 int num_remaining_levels = num_levels - num_levels_completed;
737 if (num_remaining_levels > 0) {
738 levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
739 ASSERT_EQ(num_remaining_levels, levels_count);
740 for (int i = 0; i < num_remaining_levels; i++) {
741 EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
742 }
743 }
744 // Test zero Decode values
745 ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
746}
747
748void VerifyDecodingMultipleSetData(Encoding::type encoding, int16_t max_level,
749 std::vector<int16_t>& input_levels,
750 std::vector<std::vector<uint8_t>>& bytes) {
751 LevelDecoder decoder;
752 int levels_count = 0;
753 std::vector<int16_t> output_levels;
754
755 // Decode levels and test with multiple SetData calls
756 int setdata_count = static_cast<int>(bytes.size());
757 int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
758 output_levels.resize(num_levels);
759 // Try multiple SetData
760 for (int ct = 0; ct < setdata_count; ct++) {
761 int offset = ct * num_levels;
762 ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
763 decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
764 levels_count = decoder.Decode(num_levels, output_levels.data());
765 ASSERT_EQ(num_levels, levels_count);
766 for (int i = 0; i < num_levels; i++) {
767 EXPECT_EQ(input_levels[i + offset], output_levels[i]);
768 }
769 }
770}
771
772// Test levels with maximum bit-width from 1 to 8
773// increase the repetition count for each iteration by a factor of 2
774TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
775 int min_repeat_factor = 0;
776 int max_repeat_factor = 7; // 128
777 int max_bit_width = 8;
778 std::vector<int16_t> input_levels;
779 std::vector<uint8_t> bytes;
780 Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
781
782 // for each encoding
783 for (int encode = 0; encode < 2; encode++) {
784 Encoding::type encoding = encodings[encode];
785 // BIT_PACKED requires a sequence of atleast 8
786 if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
787 // for each maximum bit-width
788 for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
789 // find the maximum level for the current bit_width
790 int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
791 // Generate levels
792 GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
793 ASSERT_NO_FATAL_FAILURE(EncodeLevels(encoding, max_level,
794 static_cast<int>(input_levels.size()),
795 input_levels.data(), bytes));
796 ASSERT_NO_FATAL_FAILURE(
797 VerifyDecodingLevels(encoding, max_level, input_levels, bytes));
798 input_levels.clear();
799 }
800 }
801}
802
803// Test multiple decoder SetData calls
804TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
805 int min_repeat_factor = 3;
806 int max_repeat_factor = 7; // 128
807 int bit_width = 8;
808 int16_t max_level = static_cast<int16_t>((1 << bit_width) - 1);
809 std::vector<int16_t> input_levels;
810 std::vector<std::vector<uint8_t>> bytes;
811 Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
812 GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
813 int num_levels = static_cast<int>(input_levels.size());
814 int setdata_factor = 8;
815 int split_level_size = num_levels / setdata_factor;
816 bytes.resize(setdata_factor);
817
818 // for each encoding
819 for (int encode = 0; encode < 2; encode++) {
820 Encoding::type encoding = encodings[encode];
821 for (int rf = 0; rf < setdata_factor; rf++) {
822 int offset = rf * split_level_size;
823 ASSERT_NO_FATAL_FAILURE(EncodeLevels(
824 encoding, max_level, split_level_size,
825 reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]));
826 }
827 ASSERT_NO_FATAL_FAILURE(
828 VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes));
829 }
830}
831
832TEST(TestLevelEncoder, MinimumBufferSize) {
833 // PARQUET-676, PARQUET-698
834 const int kNumToEncode = 1024;
835
836 std::vector<int16_t> levels;
837 for (int i = 0; i < kNumToEncode; ++i) {
838 if (i % 9 == 0) {
839 levels.push_back(0);
840 } else {
841 levels.push_back(1);
842 }
843 }
844
845 std::vector<uint8_t> output(
846 LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
847
848 LevelEncoder encoder;
849 encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(),
850 static_cast<int>(output.size()));
851 int encode_count = encoder.Encode(kNumToEncode, levels.data());
852
853 ASSERT_EQ(kNumToEncode, encode_count);
854}
855
856TEST(TestLevelEncoder, MinimumBufferSize2) {
857 // PARQUET-708
858 // Test the worst case for bit_width=2 consisting of
859 // LiteralRun(size=8)
860 // RepeatedRun(size=8)
861 // LiteralRun(size=8)
862 // ...
863 const int kNumToEncode = 1024;
864
865 std::vector<int16_t> levels;
866 for (int i = 0; i < kNumToEncode; ++i) {
867 // This forces a literal run of 00000001
868 // followed by eight 1s
869 if ((i % 16) < 7) {
870 levels.push_back(0);
871 } else {
872 levels.push_back(1);
873 }
874 }
875
876 for (int16_t bit_width = 1; bit_width <= 8; bit_width++) {
877 std::vector<uint8_t> output(
878 LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
879
880 LevelEncoder encoder;
881 encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
882 static_cast<int>(output.size()));
883 int encode_count = encoder.Encode(kNumToEncode, levels.data());
884
885 ASSERT_EQ(kNumToEncode, encode_count);
886 }
887}
888
889} // namespace test
890} // namespace parquet
891