1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#ifdef _MSC_VER
19#pragma warning(push)
20// Disable forcing value to bool warnings
21#pragma warning(disable : 4800)
22#endif
23
24#include "gtest/gtest.h"
25
26#include <arrow/compute/api.h>
27#include <cstdint>
28#include <functional>
29#include <sstream>
30#include <vector>
31
32#include "arrow/api.h"
33#include "arrow/test-util.h"
34#include "arrow/type_traits.h"
35#include "arrow/util/decimal.h"
36
37#include "parquet/api/reader.h"
38#include "parquet/api/writer.h"
39
40#include "parquet/arrow/reader.h"
41#include "parquet/arrow/schema.h"
42#include "parquet/arrow/test-util.h"
43#include "parquet/arrow/writer.h"
44#include "parquet/file_writer.h"
45#include "parquet/util/test-common.h"
46
47using arrow::Array;
48using arrow::ArrayVisitor;
49using arrow::Buffer;
50using arrow::ChunkedArray;
51using arrow::Column;
52using arrow::DataType;
53using arrow::default_memory_pool;
54using arrow::ListArray;
55using arrow::PrimitiveArray;
56using arrow::ResizableBuffer;
57using arrow::Status;
58using arrow::Table;
59using arrow::TimeUnit;
60using arrow::compute::Datum;
61using arrow::compute::DictionaryEncode;
62using arrow::compute::FunctionContext;
63using arrow::io::BufferReader;
64
65using arrow::randint;
66using arrow::random_is_valid;
67
68using ArrowId = ::arrow::Type;
69using ParquetType = parquet::Type;
70using parquet::arrow::FromParquetSchema;
71using parquet::schema::GroupNode;
72using parquet::schema::NodePtr;
73using parquet::schema::PrimitiveNode;
74
75using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>;
76
77namespace parquet {
78namespace arrow {
79
80static constexpr int SMALL_SIZE = 100;
81#ifdef PARQUET_VALGRIND
82static constexpr int LARGE_SIZE = 1000;
83#else
84static constexpr int LARGE_SIZE = 10000;
85#endif
86
87static constexpr uint32_t kDefaultSeed = 0;
88
89LogicalType::type get_logical_type(const ::DataType& type) {
90 switch (type.id()) {
91 case ArrowId::UINT8:
92 return LogicalType::UINT_8;
93 case ArrowId::INT8:
94 return LogicalType::INT_8;
95 case ArrowId::UINT16:
96 return LogicalType::UINT_16;
97 case ArrowId::INT16:
98 return LogicalType::INT_16;
99 case ArrowId::UINT32:
100 return LogicalType::UINT_32;
101 case ArrowId::INT32:
102 return LogicalType::INT_32;
103 case ArrowId::UINT64:
104 return LogicalType::UINT_64;
105 case ArrowId::INT64:
106 return LogicalType::INT_64;
107 case ArrowId::STRING:
108 return LogicalType::UTF8;
109 case ArrowId::DATE32:
110 return LogicalType::DATE;
111 case ArrowId::DATE64:
112 return LogicalType::DATE;
113 case ArrowId::TIMESTAMP: {
114 const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type);
115 switch (ts_type.unit()) {
116 case TimeUnit::MILLI:
117 return LogicalType::TIMESTAMP_MILLIS;
118 case TimeUnit::MICRO:
119 return LogicalType::TIMESTAMP_MICROS;
120 default:
121 DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps "
122 "with Parquet.";
123 }
124 }
125 case ArrowId::TIME32:
126 return LogicalType::TIME_MILLIS;
127 case ArrowId::TIME64:
128 return LogicalType::TIME_MICROS;
129 case ArrowId::DICTIONARY: {
130 const ::arrow::DictionaryType& dict_type =
131 static_cast<const ::arrow::DictionaryType&>(type);
132 return get_logical_type(*dict_type.dictionary()->type());
133 }
134 case ArrowId::DECIMAL:
135 return LogicalType::DECIMAL;
136 default:
137 break;
138 }
139 return LogicalType::NONE;
140}
141
142ParquetType::type get_physical_type(const ::DataType& type) {
143 switch (type.id()) {
144 case ArrowId::BOOL:
145 return ParquetType::BOOLEAN;
146 case ArrowId::UINT8:
147 case ArrowId::INT8:
148 case ArrowId::UINT16:
149 case ArrowId::INT16:
150 case ArrowId::UINT32:
151 case ArrowId::INT32:
152 return ParquetType::INT32;
153 case ArrowId::UINT64:
154 case ArrowId::INT64:
155 return ParquetType::INT64;
156 case ArrowId::FLOAT:
157 return ParquetType::FLOAT;
158 case ArrowId::DOUBLE:
159 return ParquetType::DOUBLE;
160 case ArrowId::BINARY:
161 return ParquetType::BYTE_ARRAY;
162 case ArrowId::STRING:
163 return ParquetType::BYTE_ARRAY;
164 case ArrowId::FIXED_SIZE_BINARY:
165 case ArrowId::DECIMAL:
166 return ParquetType::FIXED_LEN_BYTE_ARRAY;
167 case ArrowId::DATE32:
168 return ParquetType::INT32;
169 case ArrowId::DATE64:
170 // Convert to date32 internally
171 return ParquetType::INT32;
172 case ArrowId::TIME32:
173 return ParquetType::INT32;
174 case ArrowId::TIME64:
175 return ParquetType::INT64;
176 case ArrowId::TIMESTAMP:
177 return ParquetType::INT64;
178 case ArrowId::DICTIONARY: {
179 const ::arrow::DictionaryType& dict_type =
180 static_cast<const ::arrow::DictionaryType&>(type);
181 return get_physical_type(*dict_type.dictionary()->type());
182 }
183 default:
184 break;
185 }
186 DCHECK(false) << "cannot reach this code";
187 return ParquetType::INT32;
188}
189
190template <typename TestType>
191struct test_traits {};
192
193template <>
194struct test_traits<::arrow::BooleanType> {
195 static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
196 static uint8_t const value;
197};
198
199const uint8_t test_traits<::arrow::BooleanType>::value(1);
200
201template <>
202struct test_traits<::arrow::UInt8Type> {
203 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
204 static uint8_t const value;
205};
206
207const uint8_t test_traits<::arrow::UInt8Type>::value(64);
208
209template <>
210struct test_traits<::arrow::Int8Type> {
211 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
212 static int8_t const value;
213};
214
215const int8_t test_traits<::arrow::Int8Type>::value(-64);
216
217template <>
218struct test_traits<::arrow::UInt16Type> {
219 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
220 static uint16_t const value;
221};
222
223const uint16_t test_traits<::arrow::UInt16Type>::value(1024);
224
225template <>
226struct test_traits<::arrow::Int16Type> {
227 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
228 static int16_t const value;
229};
230
231const int16_t test_traits<::arrow::Int16Type>::value(-1024);
232
233template <>
234struct test_traits<::arrow::UInt32Type> {
235 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
236 static uint32_t const value;
237};
238
239const uint32_t test_traits<::arrow::UInt32Type>::value(1024);
240
241template <>
242struct test_traits<::arrow::Int32Type> {
243 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
244 static int32_t const value;
245};
246
247const int32_t test_traits<::arrow::Int32Type>::value(-1024);
248
249template <>
250struct test_traits<::arrow::UInt64Type> {
251 static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
252 static uint64_t const value;
253};
254
255const uint64_t test_traits<::arrow::UInt64Type>::value(1024);
256
257template <>
258struct test_traits<::arrow::Int64Type> {
259 static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
260 static int64_t const value;
261};
262
263const int64_t test_traits<::arrow::Int64Type>::value(-1024);
264
265template <>
266struct test_traits<::arrow::TimestampType> {
267 static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
268 static int64_t const value;
269};
270
271const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
272
273template <>
274struct test_traits<::arrow::Date32Type> {
275 static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
276 static int32_t const value;
277};
278
279const int32_t test_traits<::arrow::Date32Type>::value(170000);
280
281template <>
282struct test_traits<::arrow::FloatType> {
283 static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
284 static float const value;
285};
286
287const float test_traits<::arrow::FloatType>::value(2.1f);
288
289template <>
290struct test_traits<::arrow::DoubleType> {
291 static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
292 static double const value;
293};
294
295const double test_traits<::arrow::DoubleType>::value(4.2);
296
297template <>
298struct test_traits<::arrow::StringType> {
299 static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
300 static std::string const value;
301};
302
303template <>
304struct test_traits<::arrow::BinaryType> {
305 static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
306 static std::string const value;
307};
308
309template <>
310struct test_traits<::arrow::FixedSizeBinaryType> {
311 static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY;
312 static std::string const value;
313};
314
315const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT
316const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT
317const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT
318
319template <typename T>
320using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
321
322template <typename T>
323using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
324
325void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size,
326 const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
327 std::shared_ptr<Buffer>* out) {
328 auto sink = std::make_shared<InMemoryOutputStream>();
329
330 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
331 row_group_size, default_writer_properties(),
332 arrow_properties));
333 *out = sink->GetBuffer();
334}
335
336void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
337 ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
338 if (!actual.Equals(expected)) {
339 std::stringstream pp_result;
340 std::stringstream pp_expected;
341
342 for (int i = 0; i < actual.num_chunks(); ++i) {
343 auto c1 = actual.chunk(i);
344 auto c2 = expected.chunk(i);
345 if (!c1->Equals(*c2)) {
346 EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
347 EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
348 FAIL() << "Chunk " << i << " Got: " << pp_result.str()
349 << "\nExpected: " << pp_expected.str();
350 }
351 }
352 }
353}
354
355void PrintColumn(const Column& col, std::stringstream* ss) {
356 const ChunkedArray& carr = *col.data();
357 for (int i = 0; i < carr.num_chunks(); ++i) {
358 auto c1 = carr.chunk(i);
359 *ss << "Chunk " << i << std::endl;
360 EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss));
361 *ss << std::endl;
362 }
363}
364
365void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
366 int64_t row_group_size, const std::vector<int>& column_subset,
367 std::shared_ptr<Table>* out,
368 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
369 default_arrow_writer_properties()) {
370 std::shared_ptr<Buffer> buffer;
371 ASSERT_NO_FATAL_FAILURE(
372 WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));
373
374 std::unique_ptr<FileReader> reader;
375 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
376 ::arrow::default_memory_pool(),
377 ::parquet::default_reader_properties(), nullptr, &reader));
378
379 reader->set_use_threads(use_threads);
380
381 if (column_subset.size() > 0) {
382 ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
383 } else {
384 // Read everything
385 ASSERT_OK_NO_THROW(reader->ReadTable(out));
386 }
387}
388
389void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
390 const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
391 default_arrow_writer_properties()) {
392 std::shared_ptr<Table> result;
393 DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
394 arrow_properties);
395 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false));
396}
397
398static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
399 Repetition::type repetition) {
400 int32_t byte_width = -1;
401 int32_t precision = -1;
402 int32_t scale = -1;
403
404 switch (type.id()) {
405 case ::arrow::Type::DICTIONARY: {
406 const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
407 const ::DataType& values_type = *dict_type.dictionary()->type();
408 switch (values_type.id()) {
409 case ::arrow::Type::FIXED_SIZE_BINARY:
410 byte_width =
411 static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
412 break;
413 case ::arrow::Type::DECIMAL: {
414 const auto& decimal_type =
415 static_cast<const ::arrow::Decimal128Type&>(values_type);
416 precision = decimal_type.precision();
417 scale = decimal_type.scale();
418 byte_width = DecimalSize(precision);
419 } break;
420 default:
421 break;
422 }
423 } break;
424 case ::arrow::Type::FIXED_SIZE_BINARY:
425 byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
426 break;
427 case ::arrow::Type::DECIMAL: {
428 const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
429 precision = decimal_type.precision();
430 scale = decimal_type.scale();
431 byte_width = DecimalSize(precision);
432 } break;
433 default:
434 break;
435 }
436 auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
437 get_logical_type(type), byte_width, precision, scale);
438 NodePtr node_ =
439 GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
440 return std::static_pointer_cast<GroupNode>(node_);
441}
442
443template <typename TestType>
444class TestParquetIO : public ::testing::Test {
445 public:
446 virtual void SetUp() {}
447
448 std::unique_ptr<ParquetFileWriter> MakeWriter(
449 const std::shared_ptr<GroupNode>& schema) {
450 sink_ = std::make_shared<InMemoryOutputStream>();
451 return ParquetFileWriter::Open(sink_, schema);
452 }
453
454 void ReaderFromSink(std::unique_ptr<FileReader>* out) {
455 std::shared_ptr<Buffer> buffer = sink_->GetBuffer();
456 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
457 ::arrow::default_memory_pool(),
458 ::parquet::default_reader_properties(), nullptr, out));
459 }
460
461 void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader,
462 std::shared_ptr<Array>* out) {
463 std::unique_ptr<ColumnReader> column_reader;
464 ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader));
465 ASSERT_NE(nullptr, column_reader.get());
466
467 std::shared_ptr<ChunkedArray> chunked_out;
468 ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, &chunked_out));
469
470 ASSERT_EQ(1, chunked_out->num_chunks());
471 *out = chunked_out->chunk(0);
472 ASSERT_NE(nullptr, out->get());
473 }
474
475 void ReadAndCheckSingleColumnFile(const Array& values) {
476 std::shared_ptr<Array> out;
477
478 std::unique_ptr<FileReader> reader;
479 ReaderFromSink(&reader);
480 ReadSingleColumnFile(std::move(reader), &out);
481
482 AssertArraysEqual(values, *out);
483 }
484
485 void ReadTableFromFile(std::unique_ptr<FileReader> reader,
486 std::shared_ptr<Table>* out) {
487 ASSERT_OK_NO_THROW(reader->ReadTable(out));
488 auto key_value_metadata =
489 reader->parquet_reader()->metadata()->key_value_metadata().get();
490 ASSERT_EQ(nullptr, key_value_metadata);
491 ASSERT_NE(nullptr, out->get());
492 }
493
494 void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
495 int64_t null_count, std::shared_ptr<Table>* out) {
496 std::shared_ptr<Array> values;
497 ASSERT_OK(NullableArray<TestType>(size * size, nullable_elements ? null_count : 0,
498 kDefaultSeed, &values));
499 // Also test that slice offsets are respected
500 values = values->Slice(5, values->length() - 5);
501 std::shared_ptr<ListArray> lists;
502 ASSERT_OK(MakeListArray(values, size, nullable_lists ? null_count : 0,
503 nullable_elements, &lists));
504 *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists);
505 }
506
507 // Prepare table of empty lists, with null values array (ARROW-2744)
508 void PrepareEmptyListsTable(int64_t size, std::shared_ptr<Table>* out) {
509 std::shared_ptr<Array> lists;
510 ASSERT_OK(MakeEmptyListsArray(size, &lists));
511 *out = MakeSimpleTable(lists, true /* nullable_lists */);
512 }
513
514 void PrepareListOfListTable(int64_t size, bool nullable_parent_lists,
515 bool nullable_lists, bool nullable_elements,
516 int64_t null_count, std::shared_ptr<Table>* out) {
517 std::shared_ptr<Array> values;
518 ASSERT_OK(NullableArray<TestType>(size * 6, nullable_elements ? null_count : 0,
519 kDefaultSeed, &values));
520 std::shared_ptr<ListArray> lists;
521 ASSERT_OK(MakeListArray(values, size * 3, nullable_lists ? null_count : 0,
522 nullable_elements, &lists));
523 std::shared_ptr<ListArray> parent_lists;
524 ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0,
525 nullable_lists, &parent_lists));
526 *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
527 }
528
529 void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
530 std::shared_ptr<::arrow::Table> out;
531 std::unique_ptr<FileReader> reader;
532 ReaderFromSink(&reader);
533 ReadTableFromFile(std::move(reader), &out);
534 ASSERT_EQ(1, out->num_columns());
535 ASSERT_EQ(values->length(), out->num_rows());
536
537 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
538 ASSERT_EQ(1, chunked_array->num_chunks());
539 auto result = chunked_array->chunk(0);
540
541 AssertArraysEqual(*values, *result);
542 }
543
544 void CheckRoundTrip(const std::shared_ptr<Table>& table) {
545 CheckSimpleRoundtrip(table, table->num_rows());
546 }
547
548 template <typename ArrayType>
549 void WriteColumn(const std::shared_ptr<GroupNode>& schema,
550 const std::shared_ptr<ArrayType>& values) {
551 FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
552 ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
553 ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
554 ASSERT_OK_NO_THROW(writer.Close());
555 // writer.Close() should be idempotent
556 ASSERT_OK_NO_THROW(writer.Close());
557 }
558
559 std::shared_ptr<InMemoryOutputStream> sink_;
560};
561
562// We have separate tests for UInt32Type as this is currently the only type
563// where a roundtrip does not yield the identical Array structure.
564// There we write an UInt32 Array but receive an Int64 Array as result for
565// Parquet version 1.0.
566
567typedef ::testing::Types<
568 ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type,
569 ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
570 ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
571 ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
572 DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
573 DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
574 DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
575 DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
576 DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
577 DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
578 DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
579 DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
580 TestTypes;
581
582TYPED_TEST_CASE(TestParquetIO, TestTypes);
583
584TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
585 std::shared_ptr<Array> values;
586 ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
587
588 std::shared_ptr<GroupNode> schema =
589 MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
590 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values));
591
592 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
593}
594
595TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
596 std::shared_ptr<Array> values;
597 ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
598 std::shared_ptr<Table> table = MakeSimpleTable(values, false);
599 this->sink_ = std::make_shared<InMemoryOutputStream>();
600 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
601 values->length(), default_writer_properties()));
602
603 std::shared_ptr<Table> out;
604 std::unique_ptr<FileReader> reader;
605 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
606 ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
607 ASSERT_EQ(1, out->num_columns());
608 ASSERT_EQ(100, out->num_rows());
609
610 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
611 ASSERT_EQ(1, chunked_array->num_chunks());
612
613 AssertArraysEqual(*values, *chunked_array->chunk(0));
614}
615
616TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
617 // This also tests max_definition_level = 1
618 std::shared_ptr<Array> values;
619
620 ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
621
622 std::shared_ptr<GroupNode> schema =
623 MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
624 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values));
625
626 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
627}
628
629TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
630 // Skip tests for BOOL as we don't create dictionaries for it.
631 if (TypeParam::type_id == ::arrow::Type::BOOL) {
632 return;
633 }
634
635 std::shared_ptr<Array> values;
636
637 ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
638
639 Datum out;
640 FunctionContext ctx(default_memory_pool());
641 ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
642 std::shared_ptr<Array> dict_values = MakeArray(out.array());
643 std::shared_ptr<GroupNode> schema =
644 MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
645 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, dict_values));
646
647 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
648}
649
650TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
651 std::shared_ptr<Array> values;
652 ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values));
653 std::shared_ptr<GroupNode> schema =
654 MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
655
656 std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
657 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values));
658 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values));
659
660 // Slice offset 1 higher
661 sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
662 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values));
663 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values));
664}
665
666TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
667 std::shared_ptr<Array> values;
668 ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values));
669 std::shared_ptr<GroupNode> schema =
670 MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
671
672 std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
673 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values));
674 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values));
675
676 // Slice offset 1 higher, thus different null bitmap.
677 sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
678 ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values));
679 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values));
680}
681
682TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
683 // This also tests max_definition_level = 1
684 std::shared_ptr<Array> values;
685
686 ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
687 std::shared_ptr<Table> table = MakeSimpleTable(values, true);
688 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
689}
690
691TYPED_TEST(TestParquetIO, SingleEmptyListsColumnReadWrite) {
692 std::shared_ptr<Table> table;
693 ASSERT_NO_FATAL_FAILURE(this->PrepareEmptyListsTable(SMALL_SIZE, &table));
694 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
695}
696
697TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
698 std::shared_ptr<Table> table;
699 ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, true, 10, &table));
700 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
701}
702
703TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
704 std::shared_ptr<Table> table;
705 ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, true, 10, &table));
706 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
707}
708
709TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
710 std::shared_ptr<Table> table;
711 ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, false, 10, &table));
712 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
713}
714
715TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
716 std::shared_ptr<Table> table;
717 ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, false, 0, &table));
718 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
719}
720
721TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
722 std::shared_ptr<Table> table;
723 ASSERT_NO_FATAL_FAILURE(
724 this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table));
725 ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table));
726}
727
728TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
729 std::shared_ptr<Array> values;
730 ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
731 int64_t chunk_size = values->length() / 4;
732
733 std::shared_ptr<GroupNode> schema =
734 MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
735 FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
736 for (int i = 0; i < 4; i++) {
737 ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
738 std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
739 ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
740 }
741 ASSERT_OK_NO_THROW(writer.Close());
742
743 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
744}
745
746TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
747 std::shared_ptr<Array> values;
748 ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
749 std::shared_ptr<Table> table = MakeSimpleTable(values, false);
750 this->sink_ = std::make_shared<InMemoryOutputStream>();
751 ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), this->sink_, 512,
752 default_writer_properties()));
753
754 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values));
755}
756
757TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
758 std::shared_ptr<Array> values;
759 ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
760 std::shared_ptr<Table> table = MakeSimpleTable(values, false);
761 this->sink_ = std::make_shared<InMemoryOutputStream>();
762 auto buffer = AllocateBuffer();
763
764 {
765 // BufferOutputStream closed on gc
766 auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
767 ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), arrow_sink_, 512,
768 default_writer_properties()));
769
770 // XXX: Remove this after ARROW-455 completed
771 ASSERT_OK(arrow_sink_->Close());
772 }
773
774 auto pbuffer = std::make_shared<Buffer>(buffer->data(), buffer->size());
775
776 auto source = std::make_shared<BufferReader>(pbuffer);
777 std::shared_ptr<::arrow::Table> out;
778 std::unique_ptr<FileReader> reader;
779 ASSERT_OK_NO_THROW(OpenFile(source, ::arrow::default_memory_pool(), &reader));
780 ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
781 ASSERT_EQ(1, out->num_columns());
782 ASSERT_EQ(values->length(), out->num_rows());
783
784 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
785 ASSERT_EQ(1, chunked_array->num_chunks());
786
787 AssertArraysEqual(*values, *chunked_array->chunk(0));
788}
789
790TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
791 int64_t chunk_size = SMALL_SIZE / 4;
792 std::shared_ptr<Array> values;
793
794 ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
795
796 std::shared_ptr<GroupNode> schema =
797 MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
798 FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
799 for (int i = 0; i < 4; i++) {
800 ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
801 std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
802 ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
803 }
804 ASSERT_OK_NO_THROW(writer.Close());
805
806 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
807}
808
809TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
810 // This also tests max_definition_level = 1
811 std::shared_ptr<Array> values;
812
813 ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
814 std::shared_ptr<Table> table = MakeSimpleTable(values, true);
815 this->sink_ = std::make_shared<InMemoryOutputStream>();
816 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512,
817 default_writer_properties()));
818
819 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values));
820}
821
822TYPED_TEST(TestParquetIO, FileMetaDataWrite) {
823 std::shared_ptr<Array> values;
824 ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
825 std::shared_ptr<Table> table = MakeSimpleTable(values, false);
826 this->sink_ = std::make_shared<InMemoryOutputStream>();
827 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
828 values->length(), default_writer_properties()));
829
830 std::unique_ptr<FileReader> reader;
831 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
832 auto metadata = reader->parquet_reader()->metadata();
833 ASSERT_EQ(1, metadata->num_columns());
834 ASSERT_EQ(100, metadata->num_rows());
835
836 this->sink_ = std::make_shared<InMemoryOutputStream>();
837
838 ASSERT_OK_NO_THROW(::parquet::arrow::WriteFileMetaData(*metadata, this->sink_.get()));
839
840 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
841 auto metadata_written = reader->parquet_reader()->metadata();
842 ASSERT_EQ(metadata->size(), metadata_written->size());
843 ASSERT_EQ(metadata->num_row_groups(), metadata_written->num_row_groups());
844 ASSERT_EQ(metadata->num_rows(), metadata_written->num_rows());
845 ASSERT_EQ(metadata->num_columns(), metadata_written->num_columns());
846 ASSERT_EQ(metadata->RowGroup(0)->num_rows(), metadata_written->RowGroup(0)->num_rows());
847}
848
849using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;
850
851TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
852 // This test explicitly tests the conversion from an Impala-style timestamp
853 // to a nanoseconds-since-epoch one.
854
855 // 2nd January 1970, 11:35min 145738543ns
856 Int96 day;
857 day.value[2] = UINT32_C(2440589);
858 int64_t seconds = (11 * 60 + 35) * 60;
859 Int96SetNanoSeconds(
860 day, seconds * INT64_C(1000) * INT64_C(1000) * INT64_C(1000) + 145738543);
861 // Compute the corresponding nanosecond timestamp
862 struct tm datetime;
863 memset(&datetime, 0, sizeof(struct tm));
864 datetime.tm_year = 70;
865 datetime.tm_mon = 0;
866 datetime.tm_mday = 2;
867 datetime.tm_hour = 11;
868 datetime.tm_min = 35;
869 struct tm epoch;
870 memset(&epoch, 0, sizeof(struct tm));
871
872 epoch.tm_year = 70;
873 epoch.tm_mday = 1;
874 // Nanoseconds since the epoch
875 int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * INT64_C(1000000000);
876 val += 145738543;
877
878 std::vector<std::shared_ptr<schema::Node>> fields(
879 {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)});
880 std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>(
881 schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
882
883 // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API
884 // to write an Int96 file.
885 this->sink_ = std::make_shared<InMemoryOutputStream>();
886 auto writer = ParquetFileWriter::Open(this->sink_, schema);
887 RowGroupWriter* rg_writer = writer->AppendRowGroup();
888 ColumnWriter* c_writer = rg_writer->NextColumn();
889 auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
890 ASSERT_NE(typed_writer, nullptr);
891 typed_writer->WriteBatch(1, nullptr, nullptr, &day);
892 c_writer->Close();
893 rg_writer->Close();
894 writer->Close();
895
896 ::arrow::TimestampBuilder builder(::arrow::timestamp(TimeUnit::NANO),
897 ::arrow::default_memory_pool());
898 ASSERT_OK(builder.Append(val));
899 std::shared_ptr<Array> values;
900 ASSERT_OK(builder.Finish(&values));
901 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));
902}
903
904using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
905
906TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
907 // This also tests max_definition_level = 1
908 std::shared_ptr<Array> values;
909
910 ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values));
911 std::shared_ptr<Table> table = MakeSimpleTable(values, true);
912
913 // Parquet 2.0 roundtrip should yield an uint32_t column again
914 this->sink_ = std::make_shared<InMemoryOutputStream>();
915 std::shared_ptr<::parquet::WriterProperties> properties =
916 ::parquet::WriterProperties::Builder()
917 .version(ParquetVersion::PARQUET_2_0)
918 ->build();
919 ASSERT_OK_NO_THROW(
920 WriteTable(*table, default_memory_pool(), this->sink_, 512, properties));
921 ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values));
922}
923
924TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
925 // This also tests max_definition_level = 1
926 std::shared_ptr<Array> arr;
927 ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr));
928
929 std::shared_ptr<::arrow::UInt32Array> values =
930 std::dynamic_pointer_cast<::arrow::UInt32Array>(arr);
931
932 std::shared_ptr<Table> table = MakeSimpleTable(values, true);
933
934 // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
935 // reader that a column is unsigned.
936 this->sink_ = std::make_shared<InMemoryOutputStream>();
937 std::shared_ptr<::parquet::WriterProperties> properties =
938 ::parquet::WriterProperties::Builder()
939 .version(ParquetVersion::PARQUET_1_0)
940 ->build();
941 ASSERT_OK_NO_THROW(
942 WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
943
944 std::shared_ptr<ResizableBuffer> int64_data = AllocateBuffer();
945 {
946 ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
947 auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
948 auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data());
949 const auto cast_uint32_to_int64 = [](uint32_t value) {
950 return static_cast<int64_t>(value);
951 };
952 std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr,
953 cast_uint32_to_int64);
954 }
955
956 std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data};
957 auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(),
958 buffers, values->null_count());
959 std::shared_ptr<Array> expected_values = MakeArray(arr_data);
960 ASSERT_NE(expected_values, NULLPTR);
961
962 const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values);
963 ASSERT_GT(values->length(), 0);
964 ASSERT_EQ(values->length(), expected.length());
965
966 // TODO(phillipc): Is there a better way to compare these two arrays?
967 // AssertArraysEqual requires the same type, but we only care about values in this case
968 for (int i = 0; i < expected.length(); ++i) {
969 const bool value_is_valid = values->IsValid(i);
970 const bool expected_value_is_valid = expected.IsValid(i);
971
972 ASSERT_EQ(expected_value_is_valid, value_is_valid);
973
974 if (value_is_valid) {
975 uint32_t value = values->Value(i);
976 int64_t expected_value = expected.Value(i);
977 ASSERT_EQ(expected_value, static_cast<int64_t>(value));
978 }
979 }
980}
981
982using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
983
984TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
985 std::shared_ptr<Array> values;
986 ::arrow::StringBuilder builder;
987 for (size_t i = 0; i < SMALL_SIZE; i++) {
988 ASSERT_OK(builder.Append(""));
989 }
990 ASSERT_OK(builder.Finish(&values));
991 std::shared_ptr<Table> table = MakeSimpleTable(values, false);
992 this->sink_ = std::make_shared<InMemoryOutputStream>();
993 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
994 values->length(), default_writer_properties()));
995
996 std::shared_ptr<Table> out;
997 std::unique_ptr<FileReader> reader;
998 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
999 ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
1000 ASSERT_EQ(1, out->num_columns());
1001 ASSERT_EQ(100, out->num_rows());
1002
1003 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1004 ASSERT_EQ(1, chunked_array->num_chunks());
1005
1006 AssertArraysEqual(*values, *chunked_array->chunk(0));
1007}
1008
1009using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
1010
1011TEST_F(TestNullParquetIO, NullColumn) {
1012 for (int32_t num_rows : {0, SMALL_SIZE}) {
1013 std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(num_rows);
1014 std::shared_ptr<Table> table = MakeSimpleTable(values, true /* nullable */);
1015 this->sink_ = std::make_shared<InMemoryOutputStream>();
1016
1017 const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
1018 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1019 chunk_size, default_writer_properties()));
1020
1021 std::shared_ptr<Table> out;
1022 std::unique_ptr<FileReader> reader;
1023 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
1024 ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
1025 ASSERT_EQ(1, out->num_columns());
1026 ASSERT_EQ(num_rows, out->num_rows());
1027
1028 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1029 ASSERT_EQ(1, chunked_array->num_chunks());
1030 AssertArraysEqual(*values, *chunked_array->chunk(0));
1031 }
1032}
1033
1034TEST_F(TestNullParquetIO, NullListColumn) {
1035 std::vector<int32_t> offsets1 = {0};
1036 std::vector<int32_t> offsets2 = {0, 2, 2, 3, 115};
1037 for (std::vector<int32_t> offsets : {offsets1, offsets2}) {
1038 std::shared_ptr<Array> offsets_array, values_array, list_array;
1039 ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offsets, &offsets_array);
1040 values_array = std::make_shared<::arrow::NullArray>(offsets.back());
1041 ASSERT_OK(::arrow::ListArray::FromArrays(*offsets_array, *values_array,
1042 default_memory_pool(), &list_array));
1043
1044 std::shared_ptr<Table> table = MakeSimpleTable(list_array, false /* nullable */);
1045 this->sink_ = std::make_shared<InMemoryOutputStream>();
1046
1047 const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
1048 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1049 chunk_size, default_writer_properties()));
1050
1051 std::shared_ptr<Table> out;
1052 std::unique_ptr<FileReader> reader;
1053 this->ReaderFromSink(&reader);
1054 this->ReadTableFromFile(std::move(reader), &out);
1055 ASSERT_EQ(1, out->num_columns());
1056 ASSERT_EQ(offsets.size() - 1, out->num_rows());
1057
1058 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1059 ASSERT_EQ(1, chunked_array->num_chunks());
1060 AssertArraysEqual(*list_array, *chunked_array->chunk(0));
1061 }
1062}
1063
1064TEST_F(TestNullParquetIO, NullDictionaryColumn) {
1065 std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(0);
1066 std::shared_ptr<Array> indices =
1067 std::make_shared<::arrow::Int8Array>(SMALL_SIZE, nullptr, nullptr, SMALL_SIZE);
1068 std::shared_ptr<::arrow::DictionaryType> dict_type =
1069 std::make_shared<::arrow::DictionaryType>(::arrow::int8(), values);
1070 std::shared_ptr<Array> dict_values =
1071 std::make_shared<::arrow::DictionaryArray>(dict_type, indices);
1072 std::shared_ptr<Table> table = MakeSimpleTable(dict_values, true);
1073 this->sink_ = std::make_shared<InMemoryOutputStream>();
1074 ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
1075 dict_values->length(), default_writer_properties()));
1076
1077 std::shared_ptr<Table> out;
1078 std::unique_ptr<FileReader> reader;
1079 ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader));
1080 ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
1081 ASSERT_EQ(1, out->num_columns());
1082 ASSERT_EQ(100, out->num_rows());
1083
1084 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1085 ASSERT_EQ(1, chunked_array->num_chunks());
1086
1087 std::shared_ptr<Array> expected_values =
1088 std::make_shared<::arrow::NullArray>(SMALL_SIZE);
1089 AssertArraysEqual(*expected_values, *chunked_array->chunk(0));
1090}
1091
1092template <typename T>
1093using ParquetCDataType = typename ParquetDataType<T>::c_type;
1094
1095template <typename T>
1096struct c_type_trait {
1097 using ArrowCType = typename T::c_type;
1098};
1099
1100template <>
1101struct c_type_trait<::arrow::BooleanType> {
1102 using ArrowCType = uint8_t;
1103};
1104
1105template <typename TestType>
1106class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
1107 public:
1108 typedef typename c_type_trait<TestType>::ArrowCType T;
1109
1110 void MakeTestFile(std::vector<T>& values, int num_chunks,
1111 std::unique_ptr<FileReader>* reader) {
1112 TestType dummy;
1113
1114 std::shared_ptr<GroupNode> schema = MakeSimpleSchema(dummy, Repetition::REQUIRED);
1115 std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
1116 size_t chunk_size = values.size() / num_chunks;
1117 // Convert to Parquet's expected physical type
1118 std::vector<uint8_t> values_buffer(sizeof(ParquetCDataType<TestType>) *
1119 values.size());
1120 auto values_parquet =
1121 reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
1122 std::copy(values.cbegin(), values.cend(), values_parquet);
1123 for (int i = 0; i < num_chunks; i++) {
1124 auto row_group_writer = file_writer->AppendRowGroup();
1125 auto column_writer =
1126 static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
1127 ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
1128 column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
1129 column_writer->Close();
1130 row_group_writer->Close();
1131 }
1132 file_writer->Close();
1133 this->ReaderFromSink(reader);
1134 }
1135
1136 void CheckSingleColumnRequiredTableRead(int num_chunks) {
1137 std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
1138 std::unique_ptr<FileReader> file_reader;
1139 ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader));
1140
1141 std::shared_ptr<Table> out;
1142 this->ReadTableFromFile(std::move(file_reader), &out);
1143 ASSERT_EQ(1, out->num_columns());
1144 ASSERT_EQ(SMALL_SIZE, out->num_rows());
1145
1146 std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
1147 ASSERT_EQ(1, chunked_array->num_chunks());
1148 ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get());
1149 }
1150
1151 void CheckSingleColumnRequiredRead(int num_chunks) {
1152 std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
1153 std::unique_ptr<FileReader> file_reader;
1154 ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader));
1155
1156 std::shared_ptr<Array> out;
1157 this->ReadSingleColumnFile(std::move(file_reader), &out);
1158
1159 ExpectArrayT<TestType>(values.data(), out.get());
1160 }
1161};
1162
1163typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
1164 ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type,
1165 ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
1166 ::arrow::FloatType, ::arrow::DoubleType>
1167 PrimitiveTestTypes;
1168
1169TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
1170
1171TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
1172 ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(1));
1173}
1174
1175TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
1176 ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(1));
1177}
1178
1179TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
1180 ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(4));
1181}
1182
1183TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
1184 ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4));
1185}
1186
1187void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) {
1188 using ::arrow::ArrayFromVector;
1189
1190 std::vector<bool> is_valid = {true, true, true, false, true, true};
1191
1192 // These are only types that roundtrip without modification
1193 auto f0 = field("f0", ::arrow::date32());
1194 auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
1195 auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
1196 auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO;
1197 auto f3 = field("f3", ::arrow::timestamp(f3_unit));
1198 auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
1199 auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
1200
1201 std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
1202
1203 std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
1204 1489272000, 1489272000, 1489273000};
1205 std::vector<int64_t> t64_ns_values = {1489269000000, 1489270000000, 1489271000000,
1206 1489272000000, 1489272000000, 1489273000000};
1207 std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000,
1208 1489272000, 1489272000, 1489273000};
1209 std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
1210 1489272, 1489272, 1489273};
1211
1212 std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
1213 ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
1214 ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
1215 &a1);
1216 ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
1217 &a2);
1218 auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values;
1219 ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3);
1220 ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
1221 ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
1222
1223 std::vector<std::shared_ptr<::arrow::Column>> columns = {
1224 std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
1225 std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
1226 std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
1227
1228 *out = Table::Make(schema, columns);
1229}
1230
1231TEST(TestArrowReadWrite, DateTimeTypes) {
1232 std::shared_ptr<Table> table, result;
1233 MakeDateTimeTypesTable(&table);
1234
1235 // Cast nanaoseconds to microseconds and use INT64 physical type
1236 ASSERT_NO_FATAL_FAILURE(
1237 DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
1238 MakeDateTimeTypesTable(&table, true);
1239
1240 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
1241}
1242
1243TEST(TestArrowReadWrite, UseDeprecatedInt96) {
1244 using ::arrow::ArrayFromVector;
1245 using ::arrow::field;
1246 using ::arrow::schema;
1247
1248 std::vector<bool> is_valid = {true, true, true, false, true, true};
1249
1250 auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
1251 auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
1252 auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
1253 auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
1254
1255 std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
1256 std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
1257 1489272001, 1489272000, 1489273000};
1258 std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
1259 1489272000001, 1489272000000, 1489273000000};
1260 std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
1261 1489271000000000LL, 1489272000000001LL,
1262 1489272000000000LL, 1489273000000000LL};
1263
1264 std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
1265 ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
1266 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
1267 ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
1268 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
1269
1270 // Each input is typed with a unique TimeUnit
1271 auto input_schema = schema(
1272 {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)});
1273 auto input = Table::Make(
1274 input_schema,
1275 {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
1276 std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});
1277
1278 // When reading parquet files, all int96 schema fields are converted to
1279 // timestamp nanoseconds
1280 auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
1281 field("f_ns", t_ns)});
1282 auto ex_result = Table::Make(
1283 ex_schema,
1284 {std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
1285 std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});
1286
1287 std::shared_ptr<Table> result;
1288 ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
1289 input, false /* use_threads */, input->num_rows(), {}, &result,
1290 ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
1291
1292 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
1293
1294 // Ensure enable_deprecated_int96_timestamps as precedence over
1295 // coerce_timestamps.
1296 ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */,
1297 input->num_rows(), {}, &result,
1298 ArrowWriterProperties::Builder()
1299 .enable_deprecated_int96_timestamps()
1300 ->coerce_timestamps(TimeUnit::MILLI)
1301 ->build()));
1302
1303 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
1304}
1305
1306TEST(TestArrowReadWrite, CoerceTimestamps) {
1307 using ::arrow::ArrayFromVector;
1308 using ::arrow::field;
1309
1310 // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS
1311 std::vector<bool> is_valid = {true, true, true, false, true, true};
1312
1313 auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
1314 auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
1315 auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
1316 auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
1317
1318 std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
1319 std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
1320 1489272001, 1489272000, 1489273000};
1321 std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
1322 1489272000001, 1489272000000, 1489273000000};
1323 std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
1324 1489271000000000LL, 1489272000000001LL,
1325 1489272000000000LL, 1489273000000000LL};
1326
1327 std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
1328 ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
1329 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
1330 ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
1331 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
1332
1333 // Input table, all data as is
1334 auto s1 = std::shared_ptr<::arrow::Schema>(
1335 new ::arrow::Schema({field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us),
1336 field("f_ns", t_ns)}));
1337 auto input = Table::Make(
1338 s1,
1339 {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
1340 std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});
1341
1342 // Result when coercing to milliseconds
1343 auto s2 = std::shared_ptr<::arrow::Schema>(
1344 new ::arrow::Schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms),
1345 field("f_ns", t_ms)}));
1346 auto ex_milli_result = Table::Make(
1347 s2,
1348 {std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms),
1349 std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)});
1350
1351 std::shared_ptr<Table> milli_result;
1352 ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
1353 input, false /* use_threads */, input->num_rows(), {}, &milli_result,
1354 ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
1355 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
1356
1357 // Result when coercing to microseconds
1358 auto s3 = std::shared_ptr<::arrow::Schema>(
1359 new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
1360 field("f_ns", t_us)}));
1361 auto ex_micro_result = Table::Make(
1362 s3,
1363 {std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us),
1364 std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)});
1365
1366 std::shared_ptr<Table> micro_result;
1367 ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
1368 input, false /* use_threads */, input->num_rows(), {}, &micro_result,
1369 ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()));
1370 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result));
1371}
1372
1373TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
1374 using ::arrow::ArrayFromVector;
1375 using ::arrow::field;
1376
1377 // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS
1378 std::vector<bool> is_valid = {true, true, true, false, true, true};
1379
1380 auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
1381 auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
1382 auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
1383 auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
1384
1385 std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
1386 std::vector<int64_t> ms_values = {1489269001, 1489270001, 1489271001,
1387 1489272001, 1489272001, 1489273001};
1388 std::vector<int64_t> us_values = {1489269000001, 1489270000001, 1489271000001,
1389 1489272000001, 1489272000001, 1489273000001};
1390 std::vector<int64_t> ns_values = {1489269000000001LL, 1489270000000001LL,
1391 1489271000000001LL, 1489272000000001LL,
1392 1489272000000001LL, 1489273000000001LL};
1393
1394 std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
1395 ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
1396 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
1397 ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
1398 ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
1399
1400 auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s", t_s)}));
1401 auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms", t_ms)}));
1402 auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us", t_us)}));
1403 auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns", t_ns)}));
1404
1405 auto c1 = std::make_shared<Column>("f_s", a_s);
1406 auto c2 = std::make_shared<Column>("f_ms", a_ms);
1407 auto c3 = std::make_shared<Column>("f_us", a_us);
1408 auto c4 = std::make_shared<Column>("f_ns", a_ns);
1409
1410 auto t1 = Table::Make(s1, {c1});
1411 auto t2 = Table::Make(s2, {c2});
1412 auto t3 = Table::Make(s3, {c3});
1413 auto t4 = Table::Make(s4, {c4});
1414
1415 auto sink = std::make_shared<InMemoryOutputStream>();
1416
1417 // OK to write to millis
1418 auto coerce_millis =
1419 (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
1420 ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10,
1421 default_writer_properties(), coerce_millis));
1422 ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10,
1423 default_writer_properties(), coerce_millis));
1424
1425 // Loss of precision
1426 ASSERT_RAISES(Invalid, WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
1427 default_writer_properties(), coerce_millis));
1428 ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
1429 default_writer_properties(), coerce_millis));
1430
1431 // OK to lose precision if we explicitly allow it
1432 auto allow_truncation = (ArrowWriterProperties::Builder()
1433 .coerce_timestamps(TimeUnit::MILLI)
1434 ->allow_truncated_timestamps()
1435 ->build());
1436 ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
1437 default_writer_properties(), allow_truncation));
1438 ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
1439 default_writer_properties(), allow_truncation));
1440
1441 // OK to write micros to micros
1442 auto coerce_micros =
1443 (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
1444 ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
1445 default_writer_properties(), coerce_micros));
1446
1447 // Loss of precision
1448 ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
1449 default_writer_properties(), coerce_micros));
1450}
1451
1452TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
1453 using ::arrow::ArrayFromVector;
1454
1455 std::vector<bool> is_valid = {true, true, true, false, true, true};
1456
1457 auto f0 = field("f0", ::arrow::date64());
1458 auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
1459 auto f2 = field("f2", ::arrow::date64());
1460 auto f3 = field("f3", ::arrow::time32(TimeUnit::SECOND));
1461
1462 auto schema = ::arrow::schema({f0, f1, f2, f3});
1463
1464 std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000,
1465 1489449600000, 1489536000000, 1489622400000};
1466 std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
1467
1468 std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull, x1_nonnull;
1469
1470 ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0);
1471 ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values, &a0_nonnull);
1472
1473 ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1);
1474 ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values, &a1_nonnull);
1475
1476 std::vector<std::shared_ptr<::arrow::Column>> columns = {
1477 std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
1478 std::make_shared<Column>("f2", a0_nonnull),
1479 std::make_shared<Column>("f3", a1_nonnull)};
1480 auto table = Table::Make(schema, columns);
1481
1482 // Expected schema and values
1483 auto e0 = field("f0", ::arrow::date32());
1484 auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
1485 auto e2 = field("f2", ::arrow::date32());
1486 auto e3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
1487 auto ex_schema = ::arrow::schema({e0, e1, e2, e3});
1488
1489 std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
1490 std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
1491 ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0);
1492 ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values, &x0_nonnull);
1493
1494 ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1);
1495 ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values, &x1_nonnull);
1496
1497 std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
1498 std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1),
1499 std::make_shared<Column>("f2", x0_nonnull),
1500 std::make_shared<Column>("f3", x1_nonnull)};
1501 auto ex_table = Table::Make(ex_schema, ex_columns);
1502
1503 std::shared_ptr<Table> result;
1504 ASSERT_NO_FATAL_FAILURE(
1505 DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
1506
1507 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result));
1508}
1509
1510void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
1511 std::shared_ptr<Table>* out) {
1512 std::shared_ptr<::arrow::Column> column;
1513 std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
1514 std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
1515
1516 for (int i = 0; i < num_columns; ++i) {
1517 std::vector<std::shared_ptr<Array>> arrays;
1518 std::shared_ptr<Array> values;
1519 ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10,
1520 static_cast<uint32_t>(i), &values));
1521 std::stringstream ss;
1522 ss << "col" << i;
1523
1524 for (int j = 0; j < nchunks; ++j) {
1525 arrays.push_back(values);
1526 }
1527 column = MakeColumn(ss.str(), arrays, true);
1528
1529 columns[i] = column;
1530 fields[i] = column->field();
1531 }
1532 auto schema = std::make_shared<::arrow::Schema>(fields);
1533 *out = Table::Make(schema, columns);
1534}
1535
1536void MakeListArray(int num_rows, int max_value_length,
1537 std::shared_ptr<::DataType>* out_type,
1538 std::shared_ptr<Array>* out_array) {
1539 std::vector<int32_t> length_draws;
1540 randint(num_rows, 0, max_value_length, &length_draws);
1541
1542 std::vector<int32_t> offset_values;
1543
1544 // Make sure some of them are length 0
1545 int32_t total_elements = 0;
1546 for (size_t i = 0; i < length_draws.size(); ++i) {
1547 if (length_draws[i] < max_value_length / 10) {
1548 length_draws[i] = 0;
1549 }
1550 offset_values.push_back(total_elements);
1551 total_elements += length_draws[i];
1552 }
1553 offset_values.push_back(total_elements);
1554
1555 std::vector<int8_t> value_draws;
1556 randint(total_elements, 0, 100, &value_draws);
1557
1558 std::vector<bool> is_valid;
1559 random_is_valid(total_elements, 0.1, &is_valid);
1560
1561 std::shared_ptr<Array> values, offsets;
1562 ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
1563 value_draws, &values);
1564 ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
1565
1566 ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
1567 out_array));
1568
1569 *out_type = ::arrow::list(::arrow::int8());
1570}
1571
1572TEST(TestArrowReadWrite, MultithreadedRead) {
1573 const int num_columns = 20;
1574 const int num_rows = 1000;
1575 const bool use_threads = true;
1576
1577 std::shared_ptr<Table> table;
1578 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1579
1580 std::shared_ptr<Table> result;
1581 ASSERT_NO_FATAL_FAILURE(
1582 DoSimpleRoundtrip(table, use_threads, table->num_rows(), {}, &result));
1583
1584 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
1585}
1586
1587TEST(TestArrowReadWrite, ReadSingleRowGroup) {
1588 const int num_columns = 20;
1589 const int num_rows = 1000;
1590
1591 std::shared_ptr<Table> table;
1592 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1593
1594 std::shared_ptr<Buffer> buffer;
1595 ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
1596 default_arrow_writer_properties(), &buffer));
1597
1598 std::unique_ptr<FileReader> reader;
1599 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
1600 ::arrow::default_memory_pool(),
1601 ::parquet::default_reader_properties(), nullptr, &reader));
1602
1603 ASSERT_EQ(2, reader->num_row_groups());
1604
1605 std::shared_ptr<Table> r1, r2, r3, r4;
1606 // Read everything
1607 ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
1608 ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
1609 ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
1610 ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));
1611
1612 std::shared_ptr<Table> concatenated;
1613
1614 ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
1615 ASSERT_TRUE(table->Equals(*concatenated));
1616
1617 ASSERT_TRUE(table->Equals(*r3));
1618 ASSERT_TRUE(r2->Equals(*r4));
1619 ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated));
1620 ASSERT_TRUE(table->Equals(*concatenated));
1621}
1622
1623TEST(TestArrowReadWrite, GetRecordBatchReader) {
1624 const int num_columns = 20;
1625 const int num_rows = 1000;
1626
1627 std::shared_ptr<Table> table;
1628 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1629
1630 std::shared_ptr<Buffer> buffer;
1631 ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
1632 default_arrow_writer_properties(), &buffer));
1633
1634 std::unique_ptr<FileReader> reader;
1635 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
1636 ::arrow::default_memory_pool(),
1637 ::parquet::default_reader_properties(), nullptr, &reader));
1638
1639 std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
1640 ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
1641
1642 std::shared_ptr<::arrow::RecordBatch> batch;
1643
1644 ASSERT_OK(rb_reader->ReadNext(&batch));
1645 ASSERT_EQ(500, batch->num_rows());
1646 ASSERT_EQ(20, batch->num_columns());
1647
1648 ASSERT_OK(rb_reader->ReadNext(&batch));
1649 ASSERT_EQ(500, batch->num_rows());
1650 ASSERT_EQ(20, batch->num_columns());
1651
1652 ASSERT_OK(rb_reader->ReadNext(&batch));
1653 ASSERT_EQ(nullptr, batch);
1654}
1655
1656TEST(TestArrowReadWrite, ScanContents) {
1657 const int num_columns = 20;
1658 const int num_rows = 1000;
1659
1660 std::shared_ptr<Table> table;
1661 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1662
1663 std::shared_ptr<Buffer> buffer;
1664 ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
1665 default_arrow_writer_properties(), &buffer));
1666
1667 std::unique_ptr<FileReader> reader;
1668 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
1669 ::arrow::default_memory_pool(),
1670 ::parquet::default_reader_properties(), nullptr, &reader));
1671
1672 int64_t num_rows_returned = 0;
1673 ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned));
1674 ASSERT_EQ(num_rows, num_rows_returned);
1675
1676 ASSERT_OK_NO_THROW(reader->ScanContents({0, 1, 2}, 256, &num_rows_returned));
1677 ASSERT_EQ(num_rows, num_rows_returned);
1678}
1679
1680TEST(TestArrowReadWrite, ReadColumnSubset) {
1681 const int num_columns = 20;
1682 const int num_rows = 1000;
1683 const bool use_threads = true;
1684
1685 std::shared_ptr<Table> table;
1686 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1687
1688 std::shared_ptr<Table> result;
1689 std::vector<int> column_subset = {0, 4, 8, 10};
1690 ASSERT_NO_FATAL_FAILURE(
1691 DoSimpleRoundtrip(table, use_threads, table->num_rows(), column_subset, &result));
1692
1693 std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
1694 std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
1695 for (int i : column_subset) {
1696 ex_columns.push_back(table->column(i));
1697 ex_fields.push_back(table->column(i)->field());
1698 }
1699
1700 auto ex_schema = ::arrow::schema(ex_fields);
1701 auto expected = Table::Make(ex_schema, ex_columns);
1702 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*expected, *result));
1703}
1704
1705TEST(TestArrowReadWrite, ListLargeRecords) {
1706 // PARQUET-1308: This test passed on Linux when num_rows was smaller
1707 const int num_rows = 2000;
1708 const int row_group_size = 100;
1709
1710 std::shared_ptr<Array> list_array;
1711 std::shared_ptr<::DataType> list_type;
1712
1713 MakeListArray(num_rows, 20, &list_type, &list_array);
1714
1715 auto schema = ::arrow::schema({::arrow::field("a", list_type)});
1716
1717 std::shared_ptr<Table> table = Table::Make(schema, {list_array});
1718
1719 std::shared_ptr<Buffer> buffer;
1720 ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
1721 default_arrow_writer_properties(), &buffer));
1722
1723 std::unique_ptr<FileReader> reader;
1724 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
1725 ::arrow::default_memory_pool(),
1726 ::parquet::default_reader_properties(), nullptr, &reader));
1727
1728 // Read everything
1729 std::shared_ptr<Table> result;
1730 ASSERT_OK_NO_THROW(reader->ReadTable(&result));
1731 ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
1732
1733 // Read 1 record at a time
1734 ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
1735 ::arrow::default_memory_pool(),
1736 ::parquet::default_reader_properties(), nullptr, &reader));
1737
1738 std::unique_ptr<ColumnReader> col_reader;
1739 ASSERT_OK(reader->GetColumn(0, &col_reader));
1740
1741 std::vector<std::shared_ptr<Array>> pieces;
1742 for (int i = 0; i < num_rows; ++i) {
1743 std::shared_ptr<ChunkedArray> chunked_piece;
1744 ASSERT_OK(col_reader->NextBatch(1, &chunked_piece));
1745 ASSERT_EQ(1, chunked_piece->length());
1746 ASSERT_EQ(1, chunked_piece->num_chunks());
1747 pieces.push_back(chunked_piece->chunk(0));
1748 }
1749 auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
1750
1751 auto chunked_col =
1752 std::make_shared<::arrow::Column>(table->schema()->field(0), chunked);
1753 std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col};
1754 auto chunked_table = Table::Make(table->schema(), columns);
1755
1756 ASSERT_TRUE(table->Equals(*chunked_table));
1757}
1758
1759typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
1760 ArrayFactory;
1761
1762template <typename ArrowType>
1763struct GenerateArrayFunctor {
1764 explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {}
1765
1766 void operator()(int length, std::shared_ptr<::DataType>* type,
1767 std::shared_ptr<Array>* array) {
1768 using T = typename ArrowType::c_type;
1769
1770 // TODO(wesm): generate things other than integers
1771 std::vector<T> draws;
1772 randint(length, 0, 100, &draws);
1773
1774 std::vector<bool> is_valid;
1775 random_is_valid(length, this->pct_null, &is_valid);
1776
1777 *type = ::arrow::TypeTraits<ArrowType>::type_singleton();
1778 ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array);
1779 }
1780
1781 double pct_null;
1782};
1783
1784typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
1785 ArrayFactory;
1786
1787auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
1788 std::shared_ptr<Array>* array) {
1789 GenerateArrayFunctor<::arrow::Int32Type> func;
1790 func(length, type, array);
1791};
1792
1793auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
1794 std::shared_ptr<Array>* array) {
1795 MakeListArray(length, 100, type, array);
1796};
1797
1798TEST(TestArrowReadWrite, TableWithChunkedColumns) {
1799 std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList};
1800
1801 std::vector<int> chunk_sizes = {2, 4, 10, 2};
1802 const int64_t total_length = 18;
1803
1804 for (const auto& datagen_func : functions) {
1805 ::arrow::ArrayVector arrays;
1806 std::shared_ptr<Array> arr;
1807 std::shared_ptr<::DataType> type;
1808 datagen_func(total_length, &type, &arr);
1809
1810 int64_t offset = 0;
1811 for (int chunk_size : chunk_sizes) {
1812 arrays.push_back(arr->Slice(offset, chunk_size));
1813 offset += chunk_size;
1814 }
1815
1816 auto field = ::arrow::field("fname", type);
1817 auto schema = ::arrow::schema({field});
1818 auto col = std::make_shared<::arrow::Column>(field, arrays);
1819 auto table = Table::Make(schema, {col});
1820
1821 ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 2));
1822 ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 3));
1823 ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 10));
1824 }
1825}
1826
1827TEST(TestArrowReadWrite, TableWithDuplicateColumns) {
1828 // See ARROW-1974
1829 using ::arrow::ArrayFromVector;
1830
1831 auto f0 = field("duplicate", ::arrow::int8());
1832 auto f1 = field("duplicate", ::arrow::int16());
1833 auto schema = ::arrow::schema({f0, f1});
1834
1835 std::vector<int8_t> a0_values = {1, 2, 3};
1836 std::vector<int16_t> a1_values = {14, 15, 16};
1837
1838 std::shared_ptr<Array> a0, a1;
1839
1840 ArrayFromVector<::arrow::Int8Type, int8_t>(a0_values, &a0);
1841 ArrayFromVector<::arrow::Int16Type, int16_t>(a1_values, &a1);
1842
1843 auto table = Table::Make(schema, {std::make_shared<Column>(f0->name(), a0),
1844 std::make_shared<Column>(f1->name(), a1)});
1845 ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, table->num_rows()));
1846}
1847
1848TEST(TestArrowReadWrite, DictionaryColumnChunkedWrite) {
1849 // This is a regression test for this:
1850 //
1851 // https://issues.apache.org/jira/browse/ARROW-1938
1852 //
1853 // As of the writing of this test, columns of type
1854 // dictionary are written as their raw/expanded values.
1855 // The regression was that the whole column was being
1856 // written for each chunk.
1857 using ::arrow::ArrayFromVector;
1858
1859 std::vector<std::string> values = {"first", "second", "third"};
1860 auto type = ::arrow::utf8();
1861 std::shared_ptr<Array> dict_values;
1862 ArrayFromVector<::arrow::StringType, std::string>(values, &dict_values);
1863
1864 auto dict_type = ::arrow::dictionary(::arrow::int32(), dict_values);
1865 auto f0 = field("dictionary", dict_type);
1866 std::vector<std::shared_ptr<::arrow::Field>> fields;
1867 fields.emplace_back(f0);
1868 auto schema = ::arrow::schema(fields);
1869
1870 std::shared_ptr<Array> f0_values, f1_values;
1871 ArrayFromVector<::arrow::Int32Type, int32_t>({0, 1, 0, 2, 1}, &f0_values);
1872 ArrayFromVector<::arrow::Int32Type, int32_t>({2, 0, 1, 0, 2}, &f1_values);
1873 ::arrow::ArrayVector dict_arrays = {
1874 std::make_shared<::arrow::DictionaryArray>(dict_type, f0_values),
1875 std::make_shared<::arrow::DictionaryArray>(dict_type, f1_values)};
1876
1877 std::vector<std::shared_ptr<::arrow::Column>> columns;
1878 auto column = MakeColumn("column", dict_arrays, false);
1879 columns.emplace_back(column);
1880
1881 auto table = Table::Make(schema, columns);
1882
1883 std::shared_ptr<Table> result;
1884 DoSimpleRoundtrip(table, 1,
1885 // Just need to make sure that we make
1886 // a chunk size that is smaller than the
1887 // total number of values
1888 2, {}, &result);
1889
1890 std::vector<std::string> expected_values = {"first", "second", "first", "third",
1891 "second", "third", "first", "second",
1892 "first", "third"};
1893 columns.clear();
1894
1895 std::shared_ptr<Array> expected_array;
1896 ArrayFromVector<::arrow::StringType, std::string>(expected_values, &expected_array);
1897
1898 // The column name gets changed on output to the name of the
1899 // field, and it also turns into a nullable column
1900 columns.emplace_back(MakeColumn("dictionary", expected_array, true));
1901
1902 fields.clear();
1903 fields.emplace_back(::arrow::field("dictionary", ::arrow::utf8()));
1904 schema = ::arrow::schema(fields);
1905
1906 auto expected_table = Table::Make(schema, columns);
1907
1908 ::arrow::AssertTablesEqual(*expected_table, *result, false);
1909}
1910
1911TEST(TestArrowWrite, CheckChunkSize) {
1912 const int num_columns = 2;
1913 const int num_rows = 128;
1914 const int64_t chunk_size = 0; // note the chunk_size is 0
1915 std::shared_ptr<Table> table;
1916 ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
1917
1918 auto sink = std::make_shared<InMemoryOutputStream>();
1919
1920 ASSERT_RAISES(Invalid,
1921 WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size));
1922}
1923
1924class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
1925 protected:
1926 // make it *3 to make it easily divisible by 3
1927 const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3;
1928 std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr;
1929
1930 void InitReader() {
1931 std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
1932 ASSERT_OK_NO_THROW(
1933 OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
1934 ::parquet::default_reader_properties(), nullptr, &reader_));
1935 }
1936
1937 void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
1938 nested_parquet_ = std::make_shared<InMemoryOutputStream>();
1939
1940 writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
1941 default_writer_properties());
1942 row_group_writer_ = writer_->AppendRowGroup();
1943 }
1944
1945 void FinalizeParquetFile() {
1946 row_group_writer_->Close();
1947 writer_->Close();
1948 }
1949
1950 void MakeValues(int num_rows) {
1951 std::shared_ptr<Array> arr;
1952 ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr));
1953 values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr);
1954 }
1955
1956 void WriteColumnData(size_t num_rows, int16_t* def_levels, int16_t* rep_levels,
1957 int32_t* values) {
1958 auto typed_writer =
1959 static_cast<TypedColumnWriter<Int32Type>*>(row_group_writer_->NextColumn());
1960 typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values);
1961 }
1962
1963 void ValidateArray(const Array& array, size_t expected_nulls) {
1964 ASSERT_EQ(array.length(), values_array_->length());
1965 ASSERT_EQ(array.null_count(), expected_nulls);
1966 // Also independently count the nulls
1967 auto local_null_count = 0;
1968 for (int i = 0; i < array.length(); i++) {
1969 if (array.IsNull(i)) {
1970 local_null_count++;
1971 }
1972 }
1973 ASSERT_EQ(local_null_count, expected_nulls);
1974 }
1975
1976 void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
1977 ValidateArray(array, expected_nulls);
1978 int j = 0;
1979 for (int i = 0; i < values_array_->length(); i++) {
1980 if (array.IsNull(i)) {
1981 continue;
1982 }
1983 ASSERT_EQ(array.Value(i), values_array_->Value(j));
1984 j++;
1985 }
1986 }
1987
1988 void ValidateTableArrayTypes(const Table& table) {
1989 for (int i = 0; i < table.num_columns(); i++) {
1990 const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i);
1991 const std::shared_ptr<Column> column = table.column(i);
1992 // Compare with the column field
1993 ASSERT_TRUE(schema_field->Equals(column->field()));
1994 // Compare with the array type
1995 ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type()));
1996 }
1997 }
1998
1999 // A parquet with a simple nested schema
2000 void CreateSimpleNestedParquet(Repetition::type struct_repetition) {
2001 std::vector<NodePtr> parquet_fields;
2002 // TODO(itaiin): We are using parquet low-level file api to create the nested parquet
2003 // this needs to change when a nested writes are implemented
2004
2005 // create the schema:
2006 // <struct_repetition> group group1 {
2007 // required int32 leaf1;
2008 // optional int32 leaf2;
2009 // }
2010 // required int32 leaf3;
2011
2012 parquet_fields.push_back(GroupNode::Make(
2013 "group1", struct_repetition,
2014 {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
2015 PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)}));
2016 parquet_fields.push_back(
2017 PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
2018
2019 auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
2020
2021 // Create definition levels for the different columns that contain interleaved
2022 // nulls and values at all nesting levels
2023
2024 // definition levels for optional fields
2025 std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS);
2026 std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS);
2027 std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS);
2028 for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) {
2029 // leaf1 is required within the optional group1, so it is only null
2030 // when the group is null
2031 leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1;
2032 // leaf2 is optional, can be null in the primitive (def-level 1) or
2033 // struct level (def-level 0)
2034 leaf2_def_levels[i] = static_cast<int16_t>(i % 3);
2035 // leaf3 is required
2036 leaf3_def_levels[i] = 0;
2037 }
2038
2039 std::vector<int16_t> rep_levels(NUM_SIMPLE_TEST_ROWS, 0);
2040
2041 // Produce values for the columns
2042 MakeValues(NUM_SIMPLE_TEST_ROWS);
2043 int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data());
2044
2045 // Create the actual parquet file
2046 InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node),
2047 NUM_SIMPLE_TEST_ROWS);
2048
2049 // leaf1 column
2050 WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(), rep_levels.data(),
2051 values);
2052 // leaf2 column
2053 WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(), rep_levels.data(),
2054 values);
2055 // leaf3 column
2056 WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(), rep_levels.data(),
2057 values);
2058
2059 FinalizeParquetFile();
2060 InitReader();
2061 }
2062
2063 NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children,
2064 Repetition::type node_repetition,
2065 ParquetType::type leaf_type) {
2066 std::vector<NodePtr> children;
2067
2068 for (int i = 0; i < num_children; i++) {
2069 if (depth <= 1) {
2070 children.push_back(PrimitiveNode::Make("leaf", node_repetition, leaf_type));
2071 } else {
2072 children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children,
2073 node_repetition, leaf_type));
2074 }
2075 }
2076
2077 std::stringstream ss;
2078 ss << "group-" << depth << "-" << index;
2079 return NodePtr(GroupNode::Make(ss.str(), node_repetition, children));
2080 }
2081
2082 // A deeply nested schema
2083 void CreateMultiLevelNestedParquet(int num_trees, int tree_depth, int num_children,
2084 int num_rows, Repetition::type node_repetition) {
2085 // Create the schema
2086 std::vector<NodePtr> parquet_fields;
2087 for (int i = 0; i < num_trees; i++) {
2088 parquet_fields.push_back(CreateSingleTypedNestedGroup(
2089 i, tree_depth, num_children, node_repetition, ParquetType::INT32));
2090 }
2091 auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
2092
2093 int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
2094
2095 std::vector<int16_t> def_levels;
2096 std::vector<int16_t> rep_levels;
2097
2098 int num_levels = 0;
2099 while (num_levels < num_rows) {
2100 if (node_repetition == Repetition::REQUIRED) {
2101 def_levels.push_back(0); // all are required
2102 } else {
2103 int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2));
2104 def_levels.push_back(level); // all are optional
2105 }
2106 rep_levels.push_back(0); // none is repeated
2107 ++num_levels;
2108 }
2109
2110 // Produce values for the columns
2111 MakeValues(num_rows);
2112 int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data());
2113
2114 // Create the actual parquet file
2115 InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), num_rows);
2116
2117 for (int i = 0; i < num_columns; i++) {
2118 WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values);
2119 }
2120 FinalizeParquetFile();
2121 InitReader();
2122 }
2123
2124 class DeepParquetTestVisitor : public ArrayVisitor {
2125 public:
2126 DeepParquetTestVisitor(Repetition::type node_repetition,
2127 std::shared_ptr<::arrow::Int32Array> expected)
2128 : node_repetition_(node_repetition), expected_(expected) {}
2129
2130 Status Validate(std::shared_ptr<Array> tree) { return tree->Accept(this); }
2131
2132 virtual Status Visit(const ::arrow::Int32Array& array) {
2133 if (node_repetition_ == Repetition::REQUIRED) {
2134 if (!array.Equals(expected_)) {
2135 return Status::Invalid("leaf array data mismatch");
2136 }
2137 } else if (node_repetition_ == Repetition::OPTIONAL) {
2138 if (array.length() != expected_->length()) {
2139 return Status::Invalid("Bad leaf array length");
2140 }
2141 // expect only 1 value every `depth` row
2142 if (array.null_count() != SMALL_SIZE) {
2143 return Status::Invalid("Unexpected null count");
2144 }
2145 } else {
2146 return Status::NotImplemented("Unsupported repetition");
2147 }
2148 return Status::OK();
2149 }
2150
2151 virtual Status Visit(const ::arrow::StructArray& array) {
2152 for (int32_t i = 0; i < array.num_fields(); ++i) {
2153 auto child = array.field(i);
2154 if (node_repetition_ == Repetition::REQUIRED) {
2155 RETURN_NOT_OK(child->Accept(this));
2156 } else if (node_repetition_ == Repetition::OPTIONAL) {
2157 // Null count Must be a multiple of SMALL_SIZE
2158 if (array.null_count() % SMALL_SIZE != 0) {
2159 return Status::Invalid("Unexpected struct null count");
2160 }
2161 } else {
2162 return Status::NotImplemented("Unsupported repetition");
2163 }
2164 }
2165 return Status::OK();
2166 }
2167
2168 private:
2169 Repetition::type node_repetition_;
2170 std::shared_ptr<::arrow::Int32Array> expected_;
2171 };
2172
2173 std::shared_ptr<InMemoryOutputStream> nested_parquet_;
2174 std::unique_ptr<FileReader> reader_;
2175 std::unique_ptr<ParquetFileWriter> writer_;
2176 RowGroupWriter* row_group_writer_;
2177};
2178
2179TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
2180 ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL));
2181
2182 std::shared_ptr<Table> table;
2183 ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
2184 ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
2185 ASSERT_EQ(table->num_columns(), 2);
2186 ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
2187 ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
2188
2189 auto struct_field_array =
2190 std::static_pointer_cast<::arrow::StructArray>(table->column(0)->data()->chunk(0));
2191 auto leaf1_array =
2192 std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(0));
2193 auto leaf2_array =
2194 std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(1));
2195 auto leaf3_array =
2196 std::static_pointer_cast<::arrow::Int32Array>(table->column(1)->data()->chunk(0));
2197
2198 // validate struct and leaf arrays
2199
2200 // validate struct array
2201 ASSERT_NO_FATAL_FAILURE(ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3));
2202 // validate leaf1
2203 ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3));
2204 // validate leaf2
2205 ASSERT_NO_FATAL_FAILURE(
2206 ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3));
2207 // validate leaf3
2208 ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf3_array, 0));
2209}
2210
2211TEST_F(TestNestedSchemaRead, ReadTablePartial) {
2212 ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL));
2213 std::shared_ptr<Table> table;
2214
2215 // columns: {group1.leaf1, leaf3}
2216 ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table));
2217 ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
2218 ASSERT_EQ(table->num_columns(), 2);
2219 ASSERT_EQ(table->schema()->field(0)->name(), "group1");
2220 ASSERT_EQ(table->schema()->field(1)->name(), "leaf3");
2221 ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1);
2222 ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
2223
2224 // columns: {group1.leaf1, group1.leaf2}
2225 ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table));
2226 ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
2227 ASSERT_EQ(table->num_columns(), 1);
2228 ASSERT_EQ(table->schema()->field(0)->name(), "group1");
2229 ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
2230 ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
2231
2232 // columns: {leaf3}
2233 ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table));
2234 ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
2235 ASSERT_EQ(table->num_columns(), 1);
2236 ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
2237 ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0);
2238 ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
2239
2240 // Test with different ordering
2241 ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table));
2242 ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS);
2243 ASSERT_EQ(table->num_columns(), 2);
2244 ASSERT_EQ(table->schema()->field(0)->name(), "leaf3");
2245 ASSERT_EQ(table->schema()->field(1)->name(), "group1");
2246 ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1);
2247 ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table));
2248}
2249
2250TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) {
2251 ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::REPEATED));
2252 std::shared_ptr<Table> table;
2253 ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table));
2254}
2255
2256TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
2257#ifdef PARQUET_VALGRIND
2258 const int num_trees = 3;
2259 const int depth = 3;
2260#else
2261 const int num_trees = 5;
2262 const int depth = 5;
2263#endif
2264 const int num_children = 3;
2265 int num_rows = SMALL_SIZE * (depth + 2);
2266 ASSERT_NO_FATAL_FAILURE(CreateMultiLevelNestedParquet(num_trees, depth, num_children,
2267 num_rows, GetParam()));
2268 std::shared_ptr<Table> table;
2269 ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
2270 ASSERT_EQ(table->num_columns(), num_trees);
2271 ASSERT_EQ(table->num_rows(), num_rows);
2272
2273 DeepParquetTestVisitor visitor(GetParam(), values_array_);
2274 for (int i = 0; i < table->num_columns(); i++) {
2275 auto tree = table->column(i)->data()->chunk(0);
2276 ASSERT_OK_NO_THROW(visitor.Validate(tree));
2277 }
2278}
2279
2280INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
2281 ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
2282
2283TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) {
2284 // June 20, 2017 16:32:56 and 123456789 nanoseconds
2285 int64_t nanoseconds = INT64_C(1497976376123456789);
2286
2287 Int96 calculated;
2288
2289 Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
2290 internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
2291 ASSERT_EQ(expected, calculated);
2292}
2293
2294void TryReadDataFile(const std::string& testing_file_path, bool should_succeed = true) {
2295 std::string dir_string(test::get_data_dir());
2296 std::stringstream ss;
2297 ss << dir_string << "/" << testing_file_path;
2298 auto path = ss.str();
2299
2300 auto pool = ::arrow::default_memory_pool();
2301
2302 std::unique_ptr<FileReader> arrow_reader;
2303 try {
2304 arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)));
2305 std::shared_ptr<::arrow::Table> table;
2306 ASSERT_OK(arrow_reader->ReadTable(&table));
2307 } catch (const ParquetException& e) {
2308 if (should_succeed) {
2309 FAIL() << "Exception thrown when reading file: " << e.what();
2310 }
2311 }
2312}
2313
2314TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
2315 // PARQUET-995
2316 TryReadDataFile("alltypes_plain.parquet");
2317}
2318
2319TEST(TestArrowReaderAdHoc, CorruptedSchema) {
2320 // PARQUET-1481
2321 TryReadDataFile("bad_data/PARQUET-1481.parquet", false /* should_succeed */);
2322}
2323
2324class TestArrowReaderAdHocSparkAndHvr
2325 : public ::testing::TestWithParam<
2326 std::tuple<std::string, std::shared_ptr<::DataType>>> {};
2327
2328TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {
2329 std::string path(test::get_data_dir());
2330
2331 std::string filename;
2332 std::shared_ptr<::DataType> decimal_type;
2333 std::tie(filename, decimal_type) = GetParam();
2334
2335 path += "/" + filename;
2336 ASSERT_GT(path.size(), 0);
2337
2338 auto pool = ::arrow::default_memory_pool();
2339
2340 std::unique_ptr<FileReader> arrow_reader;
2341 ASSERT_NO_THROW(
2342 arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
2343 std::shared_ptr<::arrow::Table> table;
2344 ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
2345
2346 ASSERT_EQ(1, table->num_columns());
2347
2348 constexpr int32_t expected_length = 24;
2349
2350 auto value_column = table->column(0);
2351 ASSERT_EQ(expected_length, value_column->length());
2352
2353 auto raw_array = value_column->data();
2354 ASSERT_EQ(1, raw_array->num_chunks());
2355
2356 auto chunk = raw_array->chunk(0);
2357
2358 std::shared_ptr<Array> expected_array;
2359
2360 ::arrow::Decimal128Builder builder(decimal_type, pool);
2361
2362 for (int32_t i = 0; i < expected_length; ++i) {
2363 ::arrow::Decimal128 value((i + 1) * 100);
2364 ASSERT_OK(builder.Append(value));
2365 }
2366 ASSERT_OK(builder.Finish(&expected_array));
2367
2368 AssertArraysEqual(*expected_array, *chunk);
2369}
2370
2371INSTANTIATE_TEST_CASE_P(
2372 ReadDecimals, TestArrowReaderAdHocSparkAndHvr,
2373 ::testing::Values(
2374 std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
2375 std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
2376 std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
2377 std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)),
2378 std::make_tuple("byte_array_decimal.parquet", ::arrow::decimal(4, 2))));
2379
2380} // namespace arrow
2381
2382} // namespace parquet
2383