1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #ifdef _MSC_VER |
19 | #pragma warning(push) |
20 | // Disable forcing value to bool warnings |
21 | #pragma warning(disable : 4800) |
22 | #endif |
23 | |
24 | #include "gtest/gtest.h" |
25 | |
26 | #include <arrow/compute/api.h> |
27 | #include <cstdint> |
28 | #include <functional> |
29 | #include <sstream> |
30 | #include <vector> |
31 | |
32 | #include "arrow/api.h" |
33 | #include "arrow/test-util.h" |
34 | #include "arrow/type_traits.h" |
35 | #include "arrow/util/decimal.h" |
36 | |
37 | #include "parquet/api/reader.h" |
38 | #include "parquet/api/writer.h" |
39 | |
40 | #include "parquet/arrow/reader.h" |
41 | #include "parquet/arrow/schema.h" |
42 | #include "parquet/arrow/test-util.h" |
43 | #include "parquet/arrow/writer.h" |
44 | #include "parquet/file_writer.h" |
45 | #include "parquet/util/test-common.h" |
46 | |
47 | using arrow::Array; |
48 | using arrow::ArrayVisitor; |
49 | using arrow::Buffer; |
50 | using arrow::ChunkedArray; |
51 | using arrow::Column; |
52 | using arrow::DataType; |
53 | using arrow::default_memory_pool; |
54 | using arrow::ListArray; |
55 | using arrow::PrimitiveArray; |
56 | using arrow::ResizableBuffer; |
57 | using arrow::Status; |
58 | using arrow::Table; |
59 | using arrow::TimeUnit; |
60 | using arrow::compute::Datum; |
61 | using arrow::compute::DictionaryEncode; |
62 | using arrow::compute::FunctionContext; |
63 | using arrow::io::BufferReader; |
64 | |
65 | using arrow::randint; |
66 | using arrow::random_is_valid; |
67 | |
68 | using ArrowId = ::arrow::Type; |
69 | using ParquetType = parquet::Type; |
70 | using parquet::arrow::FromParquetSchema; |
71 | using parquet::schema::GroupNode; |
72 | using parquet::schema::NodePtr; |
73 | using parquet::schema::PrimitiveNode; |
74 | |
75 | using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>; |
76 | |
77 | namespace parquet { |
78 | namespace arrow { |
79 | |
80 | static constexpr int SMALL_SIZE = 100; |
81 | #ifdef PARQUET_VALGRIND |
82 | static constexpr int LARGE_SIZE = 1000; |
83 | #else |
84 | static constexpr int LARGE_SIZE = 10000; |
85 | #endif |
86 | |
87 | static constexpr uint32_t kDefaultSeed = 0; |
88 | |
89 | LogicalType::type get_logical_type(const ::DataType& type) { |
90 | switch (type.id()) { |
91 | case ArrowId::UINT8: |
92 | return LogicalType::UINT_8; |
93 | case ArrowId::INT8: |
94 | return LogicalType::INT_8; |
95 | case ArrowId::UINT16: |
96 | return LogicalType::UINT_16; |
97 | case ArrowId::INT16: |
98 | return LogicalType::INT_16; |
99 | case ArrowId::UINT32: |
100 | return LogicalType::UINT_32; |
101 | case ArrowId::INT32: |
102 | return LogicalType::INT_32; |
103 | case ArrowId::UINT64: |
104 | return LogicalType::UINT_64; |
105 | case ArrowId::INT64: |
106 | return LogicalType::INT_64; |
107 | case ArrowId::STRING: |
108 | return LogicalType::UTF8; |
109 | case ArrowId::DATE32: |
110 | return LogicalType::DATE; |
111 | case ArrowId::DATE64: |
112 | return LogicalType::DATE; |
113 | case ArrowId::TIMESTAMP: { |
114 | const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type); |
115 | switch (ts_type.unit()) { |
116 | case TimeUnit::MILLI: |
117 | return LogicalType::TIMESTAMP_MILLIS; |
118 | case TimeUnit::MICRO: |
119 | return LogicalType::TIMESTAMP_MICROS; |
120 | default: |
121 | DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps " |
122 | "with Parquet." ; |
123 | } |
124 | } |
125 | case ArrowId::TIME32: |
126 | return LogicalType::TIME_MILLIS; |
127 | case ArrowId::TIME64: |
128 | return LogicalType::TIME_MICROS; |
129 | case ArrowId::DICTIONARY: { |
130 | const ::arrow::DictionaryType& dict_type = |
131 | static_cast<const ::arrow::DictionaryType&>(type); |
132 | return get_logical_type(*dict_type.dictionary()->type()); |
133 | } |
134 | case ArrowId::DECIMAL: |
135 | return LogicalType::DECIMAL; |
136 | default: |
137 | break; |
138 | } |
139 | return LogicalType::NONE; |
140 | } |
141 | |
142 | ParquetType::type get_physical_type(const ::DataType& type) { |
143 | switch (type.id()) { |
144 | case ArrowId::BOOL: |
145 | return ParquetType::BOOLEAN; |
146 | case ArrowId::UINT8: |
147 | case ArrowId::INT8: |
148 | case ArrowId::UINT16: |
149 | case ArrowId::INT16: |
150 | case ArrowId::UINT32: |
151 | case ArrowId::INT32: |
152 | return ParquetType::INT32; |
153 | case ArrowId::UINT64: |
154 | case ArrowId::INT64: |
155 | return ParquetType::INT64; |
156 | case ArrowId::FLOAT: |
157 | return ParquetType::FLOAT; |
158 | case ArrowId::DOUBLE: |
159 | return ParquetType::DOUBLE; |
160 | case ArrowId::BINARY: |
161 | return ParquetType::BYTE_ARRAY; |
162 | case ArrowId::STRING: |
163 | return ParquetType::BYTE_ARRAY; |
164 | case ArrowId::FIXED_SIZE_BINARY: |
165 | case ArrowId::DECIMAL: |
166 | return ParquetType::FIXED_LEN_BYTE_ARRAY; |
167 | case ArrowId::DATE32: |
168 | return ParquetType::INT32; |
169 | case ArrowId::DATE64: |
170 | // Convert to date32 internally |
171 | return ParquetType::INT32; |
172 | case ArrowId::TIME32: |
173 | return ParquetType::INT32; |
174 | case ArrowId::TIME64: |
175 | return ParquetType::INT64; |
176 | case ArrowId::TIMESTAMP: |
177 | return ParquetType::INT64; |
178 | case ArrowId::DICTIONARY: { |
179 | const ::arrow::DictionaryType& dict_type = |
180 | static_cast<const ::arrow::DictionaryType&>(type); |
181 | return get_physical_type(*dict_type.dictionary()->type()); |
182 | } |
183 | default: |
184 | break; |
185 | } |
186 | DCHECK(false) << "cannot reach this code" ; |
187 | return ParquetType::INT32; |
188 | } |
189 | |
190 | template <typename TestType> |
191 | struct test_traits {}; |
192 | |
193 | template <> |
194 | struct test_traits<::arrow::BooleanType> { |
195 | static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; |
196 | static uint8_t const value; |
197 | }; |
198 | |
199 | const uint8_t test_traits<::arrow::BooleanType>::value(1); |
200 | |
201 | template <> |
202 | struct test_traits<::arrow::UInt8Type> { |
203 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
204 | static uint8_t const value; |
205 | }; |
206 | |
207 | const uint8_t test_traits<::arrow::UInt8Type>::value(64); |
208 | |
209 | template <> |
210 | struct test_traits<::arrow::Int8Type> { |
211 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
212 | static int8_t const value; |
213 | }; |
214 | |
215 | const int8_t test_traits<::arrow::Int8Type>::value(-64); |
216 | |
217 | template <> |
218 | struct test_traits<::arrow::UInt16Type> { |
219 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
220 | static uint16_t const value; |
221 | }; |
222 | |
223 | const uint16_t test_traits<::arrow::UInt16Type>::value(1024); |
224 | |
225 | template <> |
226 | struct test_traits<::arrow::Int16Type> { |
227 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
228 | static int16_t const value; |
229 | }; |
230 | |
231 | const int16_t test_traits<::arrow::Int16Type>::value(-1024); |
232 | |
233 | template <> |
234 | struct test_traits<::arrow::UInt32Type> { |
235 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
236 | static uint32_t const value; |
237 | }; |
238 | |
239 | const uint32_t test_traits<::arrow::UInt32Type>::value(1024); |
240 | |
241 | template <> |
242 | struct test_traits<::arrow::Int32Type> { |
243 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
244 | static int32_t const value; |
245 | }; |
246 | |
247 | const int32_t test_traits<::arrow::Int32Type>::value(-1024); |
248 | |
249 | template <> |
250 | struct test_traits<::arrow::UInt64Type> { |
251 | static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
252 | static uint64_t const value; |
253 | }; |
254 | |
255 | const uint64_t test_traits<::arrow::UInt64Type>::value(1024); |
256 | |
257 | template <> |
258 | struct test_traits<::arrow::Int64Type> { |
259 | static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
260 | static int64_t const value; |
261 | }; |
262 | |
263 | const int64_t test_traits<::arrow::Int64Type>::value(-1024); |
264 | |
265 | template <> |
266 | struct test_traits<::arrow::TimestampType> { |
267 | static constexpr ParquetType::type parquet_enum = ParquetType::INT64; |
268 | static int64_t const value; |
269 | }; |
270 | |
271 | const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); |
272 | |
273 | template <> |
274 | struct test_traits<::arrow::Date32Type> { |
275 | static constexpr ParquetType::type parquet_enum = ParquetType::INT32; |
276 | static int32_t const value; |
277 | }; |
278 | |
279 | const int32_t test_traits<::arrow::Date32Type>::value(170000); |
280 | |
281 | template <> |
282 | struct test_traits<::arrow::FloatType> { |
283 | static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; |
284 | static float const value; |
285 | }; |
286 | |
287 | const float test_traits<::arrow::FloatType>::value(2.1f); |
288 | |
289 | template <> |
290 | struct test_traits<::arrow::DoubleType> { |
291 | static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; |
292 | static double const value; |
293 | }; |
294 | |
295 | const double test_traits<::arrow::DoubleType>::value(4.2); |
296 | |
297 | template <> |
298 | struct test_traits<::arrow::StringType> { |
299 | static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; |
300 | static std::string const value; |
301 | }; |
302 | |
303 | template <> |
304 | struct test_traits<::arrow::BinaryType> { |
305 | static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; |
306 | static std::string const value; |
307 | }; |
308 | |
309 | template <> |
310 | struct test_traits<::arrow::FixedSizeBinaryType> { |
311 | static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; |
312 | static std::string const value; |
313 | }; |
314 | |
315 | const std::string test_traits<::arrow::StringType>::value("Test" ); // NOLINT |
316 | const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03" ); // NOLINT |
317 | const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed" ); // NOLINT |
318 | |
319 | template <typename T> |
320 | using ParquetDataType = DataType<test_traits<T>::parquet_enum>; |
321 | |
322 | template <typename T> |
323 | using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>; |
324 | |
325 | void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t row_group_size, |
326 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties, |
327 | std::shared_ptr<Buffer>* out) { |
328 | auto sink = std::make_shared<InMemoryOutputStream>(); |
329 | |
330 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, |
331 | row_group_size, default_writer_properties(), |
332 | arrow_properties)); |
333 | *out = sink->GetBuffer(); |
334 | } |
335 | |
336 | void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) { |
337 | ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal" ; |
338 | if (!actual.Equals(expected)) { |
339 | std::stringstream pp_result; |
340 | std::stringstream pp_expected; |
341 | |
342 | for (int i = 0; i < actual.num_chunks(); ++i) { |
343 | auto c1 = actual.chunk(i); |
344 | auto c2 = expected.chunk(i); |
345 | if (!c1->Equals(*c2)) { |
346 | EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result)); |
347 | EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected)); |
348 | FAIL() << "Chunk " << i << " Got: " << pp_result.str() |
349 | << "\nExpected: " << pp_expected.str(); |
350 | } |
351 | } |
352 | } |
353 | } |
354 | |
355 | void PrintColumn(const Column& col, std::stringstream* ss) { |
356 | const ChunkedArray& carr = *col.data(); |
357 | for (int i = 0; i < carr.num_chunks(); ++i) { |
358 | auto c1 = carr.chunk(i); |
359 | *ss << "Chunk " << i << std::endl; |
360 | EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss)); |
361 | *ss << std::endl; |
362 | } |
363 | } |
364 | |
365 | void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads, |
366 | int64_t row_group_size, const std::vector<int>& column_subset, |
367 | std::shared_ptr<Table>* out, |
368 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
369 | default_arrow_writer_properties()) { |
370 | std::shared_ptr<Buffer> buffer; |
371 | ASSERT_NO_FATAL_FAILURE( |
372 | WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer)); |
373 | |
374 | std::unique_ptr<FileReader> reader; |
375 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
376 | ::arrow::default_memory_pool(), |
377 | ::parquet::default_reader_properties(), nullptr, &reader)); |
378 | |
379 | reader->set_use_threads(use_threads); |
380 | |
381 | if (column_subset.size() > 0) { |
382 | ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); |
383 | } else { |
384 | // Read everything |
385 | ASSERT_OK_NO_THROW(reader->ReadTable(out)); |
386 | } |
387 | } |
388 | |
389 | void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size, |
390 | const std::shared_ptr<ArrowWriterProperties>& arrow_properties = |
391 | default_arrow_writer_properties()) { |
392 | std::shared_ptr<Table> result; |
393 | DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result, |
394 | arrow_properties); |
395 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false)); |
396 | } |
397 | |
398 | static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type, |
399 | Repetition::type repetition) { |
400 | int32_t byte_width = -1; |
401 | int32_t precision = -1; |
402 | int32_t scale = -1; |
403 | |
404 | switch (type.id()) { |
405 | case ::arrow::Type::DICTIONARY: { |
406 | const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type); |
407 | const ::DataType& values_type = *dict_type.dictionary()->type(); |
408 | switch (values_type.id()) { |
409 | case ::arrow::Type::FIXED_SIZE_BINARY: |
410 | byte_width = |
411 | static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width(); |
412 | break; |
413 | case ::arrow::Type::DECIMAL: { |
414 | const auto& decimal_type = |
415 | static_cast<const ::arrow::Decimal128Type&>(values_type); |
416 | precision = decimal_type.precision(); |
417 | scale = decimal_type.scale(); |
418 | byte_width = DecimalSize(precision); |
419 | } break; |
420 | default: |
421 | break; |
422 | } |
423 | } break; |
424 | case ::arrow::Type::FIXED_SIZE_BINARY: |
425 | byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width(); |
426 | break; |
427 | case ::arrow::Type::DECIMAL: { |
428 | const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type); |
429 | precision = decimal_type.precision(); |
430 | scale = decimal_type.scale(); |
431 | byte_width = DecimalSize(precision); |
432 | } break; |
433 | default: |
434 | break; |
435 | } |
436 | auto pnode = PrimitiveNode::Make("column1" , repetition, get_physical_type(type), |
437 | get_logical_type(type), byte_width, precision, scale); |
438 | NodePtr node_ = |
439 | GroupNode::Make("schema" , Repetition::REQUIRED, std::vector<NodePtr>({pnode})); |
440 | return std::static_pointer_cast<GroupNode>(node_); |
441 | } |
442 | |
443 | template <typename TestType> |
444 | class TestParquetIO : public ::testing::Test { |
445 | public: |
446 | virtual void SetUp() {} |
447 | |
448 | std::unique_ptr<ParquetFileWriter> MakeWriter( |
449 | const std::shared_ptr<GroupNode>& schema) { |
450 | sink_ = std::make_shared<InMemoryOutputStream>(); |
451 | return ParquetFileWriter::Open(sink_, schema); |
452 | } |
453 | |
454 | void ReaderFromSink(std::unique_ptr<FileReader>* out) { |
455 | std::shared_ptr<Buffer> buffer = sink_->GetBuffer(); |
456 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
457 | ::arrow::default_memory_pool(), |
458 | ::parquet::default_reader_properties(), nullptr, out)); |
459 | } |
460 | |
461 | void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader, |
462 | std::shared_ptr<Array>* out) { |
463 | std::unique_ptr<ColumnReader> column_reader; |
464 | ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader)); |
465 | ASSERT_NE(nullptr, column_reader.get()); |
466 | |
467 | std::shared_ptr<ChunkedArray> chunked_out; |
468 | ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, &chunked_out)); |
469 | |
470 | ASSERT_EQ(1, chunked_out->num_chunks()); |
471 | *out = chunked_out->chunk(0); |
472 | ASSERT_NE(nullptr, out->get()); |
473 | } |
474 | |
475 | void ReadAndCheckSingleColumnFile(const Array& values) { |
476 | std::shared_ptr<Array> out; |
477 | |
478 | std::unique_ptr<FileReader> reader; |
479 | ReaderFromSink(&reader); |
480 | ReadSingleColumnFile(std::move(reader), &out); |
481 | |
482 | AssertArraysEqual(values, *out); |
483 | } |
484 | |
485 | void ReadTableFromFile(std::unique_ptr<FileReader> reader, |
486 | std::shared_ptr<Table>* out) { |
487 | ASSERT_OK_NO_THROW(reader->ReadTable(out)); |
488 | auto key_value_metadata = |
489 | reader->parquet_reader()->metadata()->key_value_metadata().get(); |
490 | ASSERT_EQ(nullptr, key_value_metadata); |
491 | ASSERT_NE(nullptr, out->get()); |
492 | } |
493 | |
494 | void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements, |
495 | int64_t null_count, std::shared_ptr<Table>* out) { |
496 | std::shared_ptr<Array> values; |
497 | ASSERT_OK(NullableArray<TestType>(size * size, nullable_elements ? null_count : 0, |
498 | kDefaultSeed, &values)); |
499 | // Also test that slice offsets are respected |
500 | values = values->Slice(5, values->length() - 5); |
501 | std::shared_ptr<ListArray> lists; |
502 | ASSERT_OK(MakeListArray(values, size, nullable_lists ? null_count : 0, |
503 | nullable_elements, &lists)); |
504 | *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists); |
505 | } |
506 | |
507 | // Prepare table of empty lists, with null values array (ARROW-2744) |
508 | void PrepareEmptyListsTable(int64_t size, std::shared_ptr<Table>* out) { |
509 | std::shared_ptr<Array> lists; |
510 | ASSERT_OK(MakeEmptyListsArray(size, &lists)); |
511 | *out = MakeSimpleTable(lists, true /* nullable_lists */); |
512 | } |
513 | |
514 | void PrepareListOfListTable(int64_t size, bool nullable_parent_lists, |
515 | bool nullable_lists, bool nullable_elements, |
516 | int64_t null_count, std::shared_ptr<Table>* out) { |
517 | std::shared_ptr<Array> values; |
518 | ASSERT_OK(NullableArray<TestType>(size * 6, nullable_elements ? null_count : 0, |
519 | kDefaultSeed, &values)); |
520 | std::shared_ptr<ListArray> lists; |
521 | ASSERT_OK(MakeListArray(values, size * 3, nullable_lists ? null_count : 0, |
522 | nullable_elements, &lists)); |
523 | std::shared_ptr<ListArray> parent_lists; |
524 | ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0, |
525 | nullable_lists, &parent_lists)); |
526 | *out = MakeSimpleTable(parent_lists, nullable_parent_lists); |
527 | } |
528 | |
529 | void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) { |
530 | std::shared_ptr<::arrow::Table> out; |
531 | std::unique_ptr<FileReader> reader; |
532 | ReaderFromSink(&reader); |
533 | ReadTableFromFile(std::move(reader), &out); |
534 | ASSERT_EQ(1, out->num_columns()); |
535 | ASSERT_EQ(values->length(), out->num_rows()); |
536 | |
537 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
538 | ASSERT_EQ(1, chunked_array->num_chunks()); |
539 | auto result = chunked_array->chunk(0); |
540 | |
541 | AssertArraysEqual(*values, *result); |
542 | } |
543 | |
544 | void CheckRoundTrip(const std::shared_ptr<Table>& table) { |
545 | CheckSimpleRoundtrip(table, table->num_rows()); |
546 | } |
547 | |
548 | template <typename ArrayType> |
549 | void WriteColumn(const std::shared_ptr<GroupNode>& schema, |
550 | const std::shared_ptr<ArrayType>& values) { |
551 | FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema)); |
552 | ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); |
553 | ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values)); |
554 | ASSERT_OK_NO_THROW(writer.Close()); |
555 | // writer.Close() should be idempotent |
556 | ASSERT_OK_NO_THROW(writer.Close()); |
557 | } |
558 | |
559 | std::shared_ptr<InMemoryOutputStream> sink_; |
560 | }; |
561 | |
562 | // We have separate tests for UInt32Type as this is currently the only type |
563 | // where a roundtrip does not yield the identical Array structure. |
564 | // There we write an UInt32 Array but receive an Int64 Array as result for |
565 | // Parquet version 1.0. |
566 | |
567 | typedef ::testing::Types< |
568 | ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, |
569 | ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, |
570 | ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, |
571 | ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>, |
572 | DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>, |
573 | DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>, |
574 | DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>, |
575 | DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>, |
576 | DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>, |
577 | DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>, |
578 | DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>, |
579 | DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>> |
580 | TestTypes; |
581 | |
582 | TYPED_TEST_CASE(TestParquetIO, TestTypes); |
583 | |
584 | TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { |
585 | std::shared_ptr<Array> values; |
586 | ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
587 | |
588 | std::shared_ptr<GroupNode> schema = |
589 | MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
590 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); |
591 | |
592 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
593 | } |
594 | |
595 | TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { |
596 | std::shared_ptr<Array> values; |
597 | ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
598 | std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
599 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
600 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
601 | values->length(), default_writer_properties())); |
602 | |
603 | std::shared_ptr<Table> out; |
604 | std::unique_ptr<FileReader> reader; |
605 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
606 | ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
607 | ASSERT_EQ(1, out->num_columns()); |
608 | ASSERT_EQ(100, out->num_rows()); |
609 | |
610 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
611 | ASSERT_EQ(1, chunked_array->num_chunks()); |
612 | |
613 | AssertArraysEqual(*values, *chunked_array->chunk(0)); |
614 | } |
615 | |
616 | TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { |
617 | // This also tests max_definition_level = 1 |
618 | std::shared_ptr<Array> values; |
619 | |
620 | ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
621 | |
622 | std::shared_ptr<GroupNode> schema = |
623 | MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
624 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, values)); |
625 | |
626 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
627 | } |
628 | |
629 | TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { |
630 | // Skip tests for BOOL as we don't create dictionaries for it. |
631 | if (TypeParam::type_id == ::arrow::Type::BOOL) { |
632 | return; |
633 | } |
634 | |
635 | std::shared_ptr<Array> values; |
636 | |
637 | ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
638 | |
639 | Datum out; |
640 | FunctionContext ctx(default_memory_pool()); |
641 | ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out)); |
642 | std::shared_ptr<Array> dict_values = MakeArray(out.array()); |
643 | std::shared_ptr<GroupNode> schema = |
644 | MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL); |
645 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, dict_values)); |
646 | |
647 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
648 | } |
649 | |
650 | TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { |
651 | std::shared_ptr<Array> values; |
652 | ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values)); |
653 | std::shared_ptr<GroupNode> schema = |
654 | MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
655 | |
656 | std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); |
657 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
658 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
659 | |
660 | // Slice offset 1 higher |
661 | sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); |
662 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
663 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
664 | } |
665 | |
666 | TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { |
667 | std::shared_ptr<Array> values; |
668 | ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); |
669 | std::shared_ptr<GroupNode> schema = |
670 | MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
671 | |
672 | std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); |
673 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
674 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
675 | |
676 | // Slice offset 1 higher, thus different null bitmap. |
677 | sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE); |
678 | ASSERT_NO_FATAL_FAILURE(this->WriteColumn(schema, sliced_values)); |
679 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*sliced_values)); |
680 | } |
681 | |
682 | TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { |
683 | // This also tests max_definition_level = 1 |
684 | std::shared_ptr<Array> values; |
685 | |
686 | ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
687 | std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
688 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
689 | } |
690 | |
691 | TYPED_TEST(TestParquetIO, SingleEmptyListsColumnReadWrite) { |
692 | std::shared_ptr<Table> table; |
693 | ASSERT_NO_FATAL_FAILURE(this->PrepareEmptyListsTable(SMALL_SIZE, &table)); |
694 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
695 | } |
696 | |
697 | TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { |
698 | std::shared_ptr<Table> table; |
699 | ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, true, 10, &table)); |
700 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
701 | } |
702 | |
703 | TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { |
704 | std::shared_ptr<Table> table; |
705 | ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, true, 10, &table)); |
706 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
707 | } |
708 | |
709 | TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { |
710 | std::shared_ptr<Table> table; |
711 | ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, true, false, 10, &table)); |
712 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
713 | } |
714 | |
715 | TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { |
716 | std::shared_ptr<Table> table; |
717 | ASSERT_NO_FATAL_FAILURE(this->PrepareListTable(SMALL_SIZE, false, false, 0, &table)); |
718 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
719 | } |
720 | |
721 | TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { |
722 | std::shared_ptr<Table> table; |
723 | ASSERT_NO_FATAL_FAILURE( |
724 | this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table)); |
725 | ASSERT_NO_FATAL_FAILURE(this->CheckRoundTrip(table)); |
726 | } |
727 | |
728 | TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { |
729 | std::shared_ptr<Array> values; |
730 | ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
731 | int64_t chunk_size = values->length() / 4; |
732 | |
733 | std::shared_ptr<GroupNode> schema = |
734 | MakeSimpleSchema(*values->type(), Repetition::REQUIRED); |
735 | FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); |
736 | for (int i = 0; i < 4; i++) { |
737 | ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); |
738 | std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); |
739 | ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); |
740 | } |
741 | ASSERT_OK_NO_THROW(writer.Close()); |
742 | |
743 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
744 | } |
745 | |
746 | TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { |
747 | std::shared_ptr<Array> values; |
748 | ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); |
749 | std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
750 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
751 | ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), this->sink_, 512, |
752 | default_writer_properties())); |
753 | |
754 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
755 | } |
756 | |
757 | TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { |
758 | std::shared_ptr<Array> values; |
759 | ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values)); |
760 | std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
761 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
762 | auto buffer = AllocateBuffer(); |
763 | |
764 | { |
765 | // BufferOutputStream closed on gc |
766 | auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); |
767 | ASSERT_OK_NO_THROW(WriteTable(*table, default_memory_pool(), arrow_sink_, 512, |
768 | default_writer_properties())); |
769 | |
770 | // XXX: Remove this after ARROW-455 completed |
771 | ASSERT_OK(arrow_sink_->Close()); |
772 | } |
773 | |
774 | auto pbuffer = std::make_shared<Buffer>(buffer->data(), buffer->size()); |
775 | |
776 | auto source = std::make_shared<BufferReader>(pbuffer); |
777 | std::shared_ptr<::arrow::Table> out; |
778 | std::unique_ptr<FileReader> reader; |
779 | ASSERT_OK_NO_THROW(OpenFile(source, ::arrow::default_memory_pool(), &reader)); |
780 | ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
781 | ASSERT_EQ(1, out->num_columns()); |
782 | ASSERT_EQ(values->length(), out->num_rows()); |
783 | |
784 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
785 | ASSERT_EQ(1, chunked_array->num_chunks()); |
786 | |
787 | AssertArraysEqual(*values, *chunked_array->chunk(0)); |
788 | } |
789 | |
790 | TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { |
791 | int64_t chunk_size = SMALL_SIZE / 4; |
792 | std::shared_ptr<Array> values; |
793 | |
794 | ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); |
795 | |
796 | std::shared_ptr<GroupNode> schema = |
797 | MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); |
798 | FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); |
799 | for (int i = 0; i < 4; i++) { |
800 | ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); |
801 | std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size); |
802 | ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array)); |
803 | } |
804 | ASSERT_OK_NO_THROW(writer.Close()); |
805 | |
806 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
807 | } |
808 | |
809 | TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { |
810 | // This also tests max_definition_level = 1 |
811 | std::shared_ptr<Array> values; |
812 | |
813 | ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); |
814 | std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
815 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
816 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, |
817 | default_writer_properties())); |
818 | |
819 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
820 | } |
821 | |
822 | TYPED_TEST(TestParquetIO, FileMetaDataWrite) { |
823 | std::shared_ptr<Array> values; |
824 | ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values)); |
825 | std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
826 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
827 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
828 | values->length(), default_writer_properties())); |
829 | |
830 | std::unique_ptr<FileReader> reader; |
831 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
832 | auto metadata = reader->parquet_reader()->metadata(); |
833 | ASSERT_EQ(1, metadata->num_columns()); |
834 | ASSERT_EQ(100, metadata->num_rows()); |
835 | |
836 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
837 | |
838 | ASSERT_OK_NO_THROW(::parquet::arrow::WriteFileMetaData(*metadata, this->sink_.get())); |
839 | |
840 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
841 | auto metadata_written = reader->parquet_reader()->metadata(); |
842 | ASSERT_EQ(metadata->size(), metadata_written->size()); |
843 | ASSERT_EQ(metadata->num_row_groups(), metadata_written->num_row_groups()); |
844 | ASSERT_EQ(metadata->num_rows(), metadata_written->num_rows()); |
845 | ASSERT_EQ(metadata->num_columns(), metadata_written->num_columns()); |
846 | ASSERT_EQ(metadata->RowGroup(0)->num_rows(), metadata_written->RowGroup(0)->num_rows()); |
847 | } |
848 | |
849 | using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>; |
850 | |
851 | TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { |
852 | // This test explicitly tests the conversion from an Impala-style timestamp |
853 | // to a nanoseconds-since-epoch one. |
854 | |
855 | // 2nd January 1970, 11:35min 145738543ns |
856 | Int96 day; |
857 | day.value[2] = UINT32_C(2440589); |
858 | int64_t seconds = (11 * 60 + 35) * 60; |
859 | Int96SetNanoSeconds( |
860 | day, seconds * INT64_C(1000) * INT64_C(1000) * INT64_C(1000) + 145738543); |
861 | // Compute the corresponding nanosecond timestamp |
862 | struct tm datetime; |
863 | memset(&datetime, 0, sizeof(struct tm)); |
864 | datetime.tm_year = 70; |
865 | datetime.tm_mon = 0; |
866 | datetime.tm_mday = 2; |
867 | datetime.tm_hour = 11; |
868 | datetime.tm_min = 35; |
869 | struct tm epoch; |
870 | memset(&epoch, 0, sizeof(struct tm)); |
871 | |
872 | epoch.tm_year = 70; |
873 | epoch.tm_mday = 1; |
874 | // Nanoseconds since the epoch |
875 | int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * INT64_C(1000000000); |
876 | val += 145738543; |
877 | |
878 | std::vector<std::shared_ptr<schema::Node>> fields( |
879 | {schema::PrimitiveNode::Make("int96" , Repetition::REQUIRED, ParquetType::INT96)}); |
880 | std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>( |
881 | schema::GroupNode::Make("schema" , Repetition::REQUIRED, fields)); |
882 | |
883 | // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API |
884 | // to write an Int96 file. |
885 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
886 | auto writer = ParquetFileWriter::Open(this->sink_, schema); |
887 | RowGroupWriter* rg_writer = writer->AppendRowGroup(); |
888 | ColumnWriter* c_writer = rg_writer->NextColumn(); |
889 | auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer); |
890 | ASSERT_NE(typed_writer, nullptr); |
891 | typed_writer->WriteBatch(1, nullptr, nullptr, &day); |
892 | c_writer->Close(); |
893 | rg_writer->Close(); |
894 | writer->Close(); |
895 | |
896 | ::arrow::TimestampBuilder builder(::arrow::timestamp(TimeUnit::NANO), |
897 | ::arrow::default_memory_pool()); |
898 | ASSERT_OK(builder.Append(val)); |
899 | std::shared_ptr<Array> values; |
900 | ASSERT_OK(builder.Finish(&values)); |
901 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values)); |
902 | } |
903 | |
904 | using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; |
905 | |
906 | TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { |
907 | // This also tests max_definition_level = 1 |
908 | std::shared_ptr<Array> values; |
909 | |
910 | ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values)); |
911 | std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
912 | |
913 | // Parquet 2.0 roundtrip should yield an uint32_t column again |
914 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
915 | std::shared_ptr<::parquet::WriterProperties> properties = |
916 | ::parquet::WriterProperties::Builder() |
917 | .version(ParquetVersion::PARQUET_2_0) |
918 | ->build(); |
919 | ASSERT_OK_NO_THROW( |
920 | WriteTable(*table, default_memory_pool(), this->sink_, 512, properties)); |
921 | ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values)); |
922 | } |
923 | |
924 | TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { |
925 | // This also tests max_definition_level = 1 |
926 | std::shared_ptr<Array> arr; |
927 | ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr)); |
928 | |
929 | std::shared_ptr<::arrow::UInt32Array> values = |
930 | std::dynamic_pointer_cast<::arrow::UInt32Array>(arr); |
931 | |
932 | std::shared_ptr<Table> table = MakeSimpleTable(values, true); |
933 | |
934 | // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 |
935 | // reader that a column is unsigned. |
936 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
937 | std::shared_ptr<::parquet::WriterProperties> properties = |
938 | ::parquet::WriterProperties::Builder() |
939 | .version(ParquetVersion::PARQUET_1_0) |
940 | ->build(); |
941 | ASSERT_OK_NO_THROW( |
942 | WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties)); |
943 | |
944 | std::shared_ptr<ResizableBuffer> int64_data = AllocateBuffer(); |
945 | { |
946 | ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); |
947 | auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data()); |
948 | auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data()); |
949 | const auto cast_uint32_to_int64 = [](uint32_t value) { |
950 | return static_cast<int64_t>(value); |
951 | }; |
952 | std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr, |
953 | cast_uint32_to_int64); |
954 | } |
955 | |
956 | std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data}; |
957 | auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), |
958 | buffers, values->null_count()); |
959 | std::shared_ptr<Array> expected_values = MakeArray(arr_data); |
960 | ASSERT_NE(expected_values, NULLPTR); |
961 | |
962 | const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values); |
963 | ASSERT_GT(values->length(), 0); |
964 | ASSERT_EQ(values->length(), expected.length()); |
965 | |
966 | // TODO(phillipc): Is there a better way to compare these two arrays? |
967 | // AssertArraysEqual requires the same type, but we only care about values in this case |
968 | for (int i = 0; i < expected.length(); ++i) { |
969 | const bool value_is_valid = values->IsValid(i); |
970 | const bool expected_value_is_valid = expected.IsValid(i); |
971 | |
972 | ASSERT_EQ(expected_value_is_valid, value_is_valid); |
973 | |
974 | if (value_is_valid) { |
975 | uint32_t value = values->Value(i); |
976 | int64_t expected_value = expected.Value(i); |
977 | ASSERT_EQ(expected_value, static_cast<int64_t>(value)); |
978 | } |
979 | } |
980 | } |
981 | |
982 | using TestStringParquetIO = TestParquetIO<::arrow::StringType>; |
983 | |
984 | TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { |
985 | std::shared_ptr<Array> values; |
986 | ::arrow::StringBuilder builder; |
987 | for (size_t i = 0; i < SMALL_SIZE; i++) { |
988 | ASSERT_OK(builder.Append("" )); |
989 | } |
990 | ASSERT_OK(builder.Finish(&values)); |
991 | std::shared_ptr<Table> table = MakeSimpleTable(values, false); |
992 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
993 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
994 | values->length(), default_writer_properties())); |
995 | |
996 | std::shared_ptr<Table> out; |
997 | std::unique_ptr<FileReader> reader; |
998 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
999 | ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
1000 | ASSERT_EQ(1, out->num_columns()); |
1001 | ASSERT_EQ(100, out->num_rows()); |
1002 | |
1003 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
1004 | ASSERT_EQ(1, chunked_array->num_chunks()); |
1005 | |
1006 | AssertArraysEqual(*values, *chunked_array->chunk(0)); |
1007 | } |
1008 | |
1009 | using TestNullParquetIO = TestParquetIO<::arrow::NullType>; |
1010 | |
1011 | TEST_F(TestNullParquetIO, NullColumn) { |
1012 | for (int32_t num_rows : {0, SMALL_SIZE}) { |
1013 | std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(num_rows); |
1014 | std::shared_ptr<Table> table = MakeSimpleTable(values, true /* nullable */); |
1015 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
1016 | |
1017 | const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows()); |
1018 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
1019 | chunk_size, default_writer_properties())); |
1020 | |
1021 | std::shared_ptr<Table> out; |
1022 | std::unique_ptr<FileReader> reader; |
1023 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
1024 | ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
1025 | ASSERT_EQ(1, out->num_columns()); |
1026 | ASSERT_EQ(num_rows, out->num_rows()); |
1027 | |
1028 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
1029 | ASSERT_EQ(1, chunked_array->num_chunks()); |
1030 | AssertArraysEqual(*values, *chunked_array->chunk(0)); |
1031 | } |
1032 | } |
1033 | |
1034 | TEST_F(TestNullParquetIO, NullListColumn) { |
1035 | std::vector<int32_t> offsets1 = {0}; |
1036 | std::vector<int32_t> offsets2 = {0, 2, 2, 3, 115}; |
1037 | for (std::vector<int32_t> offsets : {offsets1, offsets2}) { |
1038 | std::shared_ptr<Array> offsets_array, values_array, list_array; |
1039 | ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offsets, &offsets_array); |
1040 | values_array = std::make_shared<::arrow::NullArray>(offsets.back()); |
1041 | ASSERT_OK(::arrow::ListArray::FromArrays(*offsets_array, *values_array, |
1042 | default_memory_pool(), &list_array)); |
1043 | |
1044 | std::shared_ptr<Table> table = MakeSimpleTable(list_array, false /* nullable */); |
1045 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
1046 | |
1047 | const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows()); |
1048 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
1049 | chunk_size, default_writer_properties())); |
1050 | |
1051 | std::shared_ptr<Table> out; |
1052 | std::unique_ptr<FileReader> reader; |
1053 | this->ReaderFromSink(&reader); |
1054 | this->ReadTableFromFile(std::move(reader), &out); |
1055 | ASSERT_EQ(1, out->num_columns()); |
1056 | ASSERT_EQ(offsets.size() - 1, out->num_rows()); |
1057 | |
1058 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
1059 | ASSERT_EQ(1, chunked_array->num_chunks()); |
1060 | AssertArraysEqual(*list_array, *chunked_array->chunk(0)); |
1061 | } |
1062 | } |
1063 | |
1064 | TEST_F(TestNullParquetIO, NullDictionaryColumn) { |
1065 | std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(0); |
1066 | std::shared_ptr<Array> indices = |
1067 | std::make_shared<::arrow::Int8Array>(SMALL_SIZE, nullptr, nullptr, SMALL_SIZE); |
1068 | std::shared_ptr<::arrow::DictionaryType> dict_type = |
1069 | std::make_shared<::arrow::DictionaryType>(::arrow::int8(), values); |
1070 | std::shared_ptr<Array> dict_values = |
1071 | std::make_shared<::arrow::DictionaryArray>(dict_type, indices); |
1072 | std::shared_ptr<Table> table = MakeSimpleTable(dict_values, true); |
1073 | this->sink_ = std::make_shared<InMemoryOutputStream>(); |
1074 | ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, |
1075 | dict_values->length(), default_writer_properties())); |
1076 | |
1077 | std::shared_ptr<Table> out; |
1078 | std::unique_ptr<FileReader> reader; |
1079 | ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); |
1080 | ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out)); |
1081 | ASSERT_EQ(1, out->num_columns()); |
1082 | ASSERT_EQ(100, out->num_rows()); |
1083 | |
1084 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
1085 | ASSERT_EQ(1, chunked_array->num_chunks()); |
1086 | |
1087 | std::shared_ptr<Array> expected_values = |
1088 | std::make_shared<::arrow::NullArray>(SMALL_SIZE); |
1089 | AssertArraysEqual(*expected_values, *chunked_array->chunk(0)); |
1090 | } |
1091 | |
1092 | template <typename T> |
1093 | using ParquetCDataType = typename ParquetDataType<T>::c_type; |
1094 | |
1095 | template <typename T> |
1096 | struct c_type_trait { |
1097 | using ArrowCType = typename T::c_type; |
1098 | }; |
1099 | |
1100 | template <> |
1101 | struct c_type_trait<::arrow::BooleanType> { |
1102 | using ArrowCType = uint8_t; |
1103 | }; |
1104 | |
1105 | template <typename TestType> |
1106 | class TestPrimitiveParquetIO : public TestParquetIO<TestType> { |
1107 | public: |
1108 | typedef typename c_type_trait<TestType>::ArrowCType T; |
1109 | |
1110 | void MakeTestFile(std::vector<T>& values, int num_chunks, |
1111 | std::unique_ptr<FileReader>* reader) { |
1112 | TestType dummy; |
1113 | |
1114 | std::shared_ptr<GroupNode> schema = MakeSimpleSchema(dummy, Repetition::REQUIRED); |
1115 | std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema); |
1116 | size_t chunk_size = values.size() / num_chunks; |
1117 | // Convert to Parquet's expected physical type |
1118 | std::vector<uint8_t> values_buffer(sizeof(ParquetCDataType<TestType>) * |
1119 | values.size()); |
1120 | auto values_parquet = |
1121 | reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data()); |
1122 | std::copy(values.cbegin(), values.cend(), values_parquet); |
1123 | for (int i = 0; i < num_chunks; i++) { |
1124 | auto row_group_writer = file_writer->AppendRowGroup(); |
1125 | auto column_writer = |
1126 | static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn()); |
1127 | ParquetCDataType<TestType>* data = values_parquet + i * chunk_size; |
1128 | column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); |
1129 | column_writer->Close(); |
1130 | row_group_writer->Close(); |
1131 | } |
1132 | file_writer->Close(); |
1133 | this->ReaderFromSink(reader); |
1134 | } |
1135 | |
1136 | void CheckSingleColumnRequiredTableRead(int num_chunks) { |
1137 | std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); |
1138 | std::unique_ptr<FileReader> file_reader; |
1139 | ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader)); |
1140 | |
1141 | std::shared_ptr<Table> out; |
1142 | this->ReadTableFromFile(std::move(file_reader), &out); |
1143 | ASSERT_EQ(1, out->num_columns()); |
1144 | ASSERT_EQ(SMALL_SIZE, out->num_rows()); |
1145 | |
1146 | std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); |
1147 | ASSERT_EQ(1, chunked_array->num_chunks()); |
1148 | ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get()); |
1149 | } |
1150 | |
1151 | void CheckSingleColumnRequiredRead(int num_chunks) { |
1152 | std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); |
1153 | std::unique_ptr<FileReader> file_reader; |
1154 | ASSERT_NO_FATAL_FAILURE(MakeTestFile(values, num_chunks, &file_reader)); |
1155 | |
1156 | std::shared_ptr<Array> out; |
1157 | this->ReadSingleColumnFile(std::move(file_reader), &out); |
1158 | |
1159 | ExpectArrayT<TestType>(values.data(), out.get()); |
1160 | } |
1161 | }; |
1162 | |
1163 | typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, |
1164 | ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, |
1165 | ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, |
1166 | ::arrow::FloatType, ::arrow::DoubleType> |
1167 | PrimitiveTestTypes; |
1168 | |
1169 | TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); |
1170 | |
1171 | TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { |
1172 | ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(1)); |
1173 | } |
1174 | |
1175 | TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { |
1176 | ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(1)); |
1177 | } |
1178 | |
1179 | TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { |
1180 | ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredRead(4)); |
1181 | } |
1182 | |
1183 | TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { |
1184 | ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4)); |
1185 | } |
1186 | |
1187 | void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) { |
1188 | using ::arrow::ArrayFromVector; |
1189 | |
1190 | std::vector<bool> is_valid = {true, true, true, false, true, true}; |
1191 | |
1192 | // These are only types that roundtrip without modification |
1193 | auto f0 = field("f0" , ::arrow::date32()); |
1194 | auto f1 = field("f1" , ::arrow::timestamp(TimeUnit::MILLI)); |
1195 | auto f2 = field("f2" , ::arrow::timestamp(TimeUnit::MICRO)); |
1196 | auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO; |
1197 | auto f3 = field("f3" , ::arrow::timestamp(f3_unit)); |
1198 | auto f4 = field("f4" , ::arrow::time32(TimeUnit::MILLI)); |
1199 | auto f5 = field("f5" , ::arrow::time64(TimeUnit::MICRO)); |
1200 | |
1201 | std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5})); |
1202 | |
1203 | std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000, |
1204 | 1489272000, 1489272000, 1489273000}; |
1205 | std::vector<int64_t> t64_ns_values = {1489269000000, 1489270000000, 1489271000000, |
1206 | 1489272000000, 1489272000000, 1489273000000}; |
1207 | std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000, |
1208 | 1489272000, 1489272000, 1489273000}; |
1209 | std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271, |
1210 | 1489272, 1489272, 1489273}; |
1211 | |
1212 | std::shared_ptr<Array> a0, a1, a2, a3, a4, a5; |
1213 | ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); |
1214 | ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values, |
1215 | &a1); |
1216 | ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values, |
1217 | &a2); |
1218 | auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values; |
1219 | ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3); |
1220 | ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4); |
1221 | ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5); |
1222 | |
1223 | std::vector<std::shared_ptr<::arrow::Column>> columns = { |
1224 | std::make_shared<Column>("f0" , a0), std::make_shared<Column>("f1" , a1), |
1225 | std::make_shared<Column>("f2" , a2), std::make_shared<Column>("f3" , a3), |
1226 | std::make_shared<Column>("f4" , a4), std::make_shared<Column>("f5" , a5)}; |
1227 | |
1228 | *out = Table::Make(schema, columns); |
1229 | } |
1230 | |
1231 | TEST(TestArrowReadWrite, DateTimeTypes) { |
1232 | std::shared_ptr<Table> table, result; |
1233 | MakeDateTimeTypesTable(&table); |
1234 | |
1235 | // Cast nanaoseconds to microseconds and use INT64 physical type |
1236 | ASSERT_NO_FATAL_FAILURE( |
1237 | DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); |
1238 | MakeDateTimeTypesTable(&table, true); |
1239 | |
1240 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); |
1241 | } |
1242 | |
1243 | TEST(TestArrowReadWrite, UseDeprecatedInt96) { |
1244 | using ::arrow::ArrayFromVector; |
1245 | using ::arrow::field; |
1246 | using ::arrow::schema; |
1247 | |
1248 | std::vector<bool> is_valid = {true, true, true, false, true, true}; |
1249 | |
1250 | auto t_s = ::arrow::timestamp(TimeUnit::SECOND); |
1251 | auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); |
1252 | auto t_us = ::arrow::timestamp(TimeUnit::MICRO); |
1253 | auto t_ns = ::arrow::timestamp(TimeUnit::NANO); |
1254 | |
1255 | std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; |
1256 | std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000, |
1257 | 1489272001, 1489272000, 1489273000}; |
1258 | std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000, |
1259 | 1489272000001, 1489272000000, 1489273000000}; |
1260 | std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL, |
1261 | 1489271000000000LL, 1489272000000001LL, |
1262 | 1489272000000000LL, 1489273000000000LL}; |
1263 | |
1264 | std::shared_ptr<Array> a_s, a_ms, a_us, a_ns; |
1265 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); |
1266 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); |
1267 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); |
1268 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); |
1269 | |
1270 | // Each input is typed with a unique TimeUnit |
1271 | auto input_schema = schema( |
1272 | {field("f_s" , t_s), field("f_ms" , t_ms), field("f_us" , t_us), field("f_ns" , t_ns)}); |
1273 | auto input = Table::Make( |
1274 | input_schema, |
1275 | {std::make_shared<Column>("f_s" , a_s), std::make_shared<Column>("f_ms" , a_ms), |
1276 | std::make_shared<Column>("f_us" , a_us), std::make_shared<Column>("f_ns" , a_ns)}); |
1277 | |
1278 | // When reading parquet files, all int96 schema fields are converted to |
1279 | // timestamp nanoseconds |
1280 | auto ex_schema = schema({field("f_s" , t_ns), field("f_ms" , t_ns), field("f_us" , t_ns), |
1281 | field("f_ns" , t_ns)}); |
1282 | auto ex_result = Table::Make( |
1283 | ex_schema, |
1284 | {std::make_shared<Column>("f_s" , a_ns), std::make_shared<Column>("f_ms" , a_ns), |
1285 | std::make_shared<Column>("f_us" , a_ns), std::make_shared<Column>("f_ns" , a_ns)}); |
1286 | |
1287 | std::shared_ptr<Table> result; |
1288 | ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
1289 | input, false /* use_threads */, input->num_rows(), {}, &result, |
1290 | ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build())); |
1291 | |
1292 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); |
1293 | |
1294 | // Ensure enable_deprecated_int96_timestamps as precedence over |
1295 | // coerce_timestamps. |
1296 | ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */, |
1297 | input->num_rows(), {}, &result, |
1298 | ArrowWriterProperties::Builder() |
1299 | .enable_deprecated_int96_timestamps() |
1300 | ->coerce_timestamps(TimeUnit::MILLI) |
1301 | ->build())); |
1302 | |
1303 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); |
1304 | } |
1305 | |
1306 | TEST(TestArrowReadWrite, CoerceTimestamps) { |
1307 | using ::arrow::ArrayFromVector; |
1308 | using ::arrow::field; |
1309 | |
1310 | // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS |
1311 | std::vector<bool> is_valid = {true, true, true, false, true, true}; |
1312 | |
1313 | auto t_s = ::arrow::timestamp(TimeUnit::SECOND); |
1314 | auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); |
1315 | auto t_us = ::arrow::timestamp(TimeUnit::MICRO); |
1316 | auto t_ns = ::arrow::timestamp(TimeUnit::NANO); |
1317 | |
1318 | std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; |
1319 | std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000, |
1320 | 1489272001, 1489272000, 1489273000}; |
1321 | std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000, |
1322 | 1489272000001, 1489272000000, 1489273000000}; |
1323 | std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL, |
1324 | 1489271000000000LL, 1489272000000001LL, |
1325 | 1489272000000000LL, 1489273000000000LL}; |
1326 | |
1327 | std::shared_ptr<Array> a_s, a_ms, a_us, a_ns; |
1328 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); |
1329 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); |
1330 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); |
1331 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); |
1332 | |
1333 | // Input table, all data as is |
1334 | auto s1 = std::shared_ptr<::arrow::Schema>( |
1335 | new ::arrow::Schema({field("f_s" , t_s), field("f_ms" , t_ms), field("f_us" , t_us), |
1336 | field("f_ns" , t_ns)})); |
1337 | auto input = Table::Make( |
1338 | s1, |
1339 | {std::make_shared<Column>("f_s" , a_s), std::make_shared<Column>("f_ms" , a_ms), |
1340 | std::make_shared<Column>("f_us" , a_us), std::make_shared<Column>("f_ns" , a_ns)}); |
1341 | |
1342 | // Result when coercing to milliseconds |
1343 | auto s2 = std::shared_ptr<::arrow::Schema>( |
1344 | new ::arrow::Schema({field("f_s" , t_ms), field("f_ms" , t_ms), field("f_us" , t_ms), |
1345 | field("f_ns" , t_ms)})); |
1346 | auto ex_milli_result = Table::Make( |
1347 | s2, |
1348 | {std::make_shared<Column>("f_s" , a_ms), std::make_shared<Column>("f_ms" , a_ms), |
1349 | std::make_shared<Column>("f_us" , a_ms), std::make_shared<Column>("f_ns" , a_ms)}); |
1350 | |
1351 | std::shared_ptr<Table> milli_result; |
1352 | ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
1353 | input, false /* use_threads */, input->num_rows(), {}, &milli_result, |
1354 | ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build())); |
1355 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result)); |
1356 | |
1357 | // Result when coercing to microseconds |
1358 | auto s3 = std::shared_ptr<::arrow::Schema>( |
1359 | new ::arrow::Schema({field("f_s" , t_us), field("f_ms" , t_us), field("f_us" , t_us), |
1360 | field("f_ns" , t_us)})); |
1361 | auto ex_micro_result = Table::Make( |
1362 | s3, |
1363 | {std::make_shared<Column>("f_s" , a_us), std::make_shared<Column>("f_ms" , a_us), |
1364 | std::make_shared<Column>("f_us" , a_us), std::make_shared<Column>("f_ns" , a_us)}); |
1365 | |
1366 | std::shared_ptr<Table> micro_result; |
1367 | ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip( |
1368 | input, false /* use_threads */, input->num_rows(), {}, µ_result, |
1369 | ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build())); |
1370 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result)); |
1371 | } |
1372 | |
1373 | TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) { |
1374 | using ::arrow::ArrayFromVector; |
1375 | using ::arrow::field; |
1376 | |
1377 | // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS |
1378 | std::vector<bool> is_valid = {true, true, true, false, true, true}; |
1379 | |
1380 | auto t_s = ::arrow::timestamp(TimeUnit::SECOND); |
1381 | auto t_ms = ::arrow::timestamp(TimeUnit::MILLI); |
1382 | auto t_us = ::arrow::timestamp(TimeUnit::MICRO); |
1383 | auto t_ns = ::arrow::timestamp(TimeUnit::NANO); |
1384 | |
1385 | std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273}; |
1386 | std::vector<int64_t> ms_values = {1489269001, 1489270001, 1489271001, |
1387 | 1489272001, 1489272001, 1489273001}; |
1388 | std::vector<int64_t> us_values = {1489269000001, 1489270000001, 1489271000001, |
1389 | 1489272000001, 1489272000001, 1489273000001}; |
1390 | std::vector<int64_t> ns_values = {1489269000000001LL, 1489270000000001LL, |
1391 | 1489271000000001LL, 1489272000000001LL, |
1392 | 1489272000000001LL, 1489273000000001LL}; |
1393 | |
1394 | std::shared_ptr<Array> a_s, a_ms, a_us, a_ns; |
1395 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s); |
1396 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms); |
1397 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us); |
1398 | ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns); |
1399 | |
1400 | auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s" , t_s)})); |
1401 | auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms" , t_ms)})); |
1402 | auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us" , t_us)})); |
1403 | auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns" , t_ns)})); |
1404 | |
1405 | auto c1 = std::make_shared<Column>("f_s" , a_s); |
1406 | auto c2 = std::make_shared<Column>("f_ms" , a_ms); |
1407 | auto c3 = std::make_shared<Column>("f_us" , a_us); |
1408 | auto c4 = std::make_shared<Column>("f_ns" , a_ns); |
1409 | |
1410 | auto t1 = Table::Make(s1, {c1}); |
1411 | auto t2 = Table::Make(s2, {c2}); |
1412 | auto t3 = Table::Make(s3, {c3}); |
1413 | auto t4 = Table::Make(s4, {c4}); |
1414 | |
1415 | auto sink = std::make_shared<InMemoryOutputStream>(); |
1416 | |
1417 | // OK to write to millis |
1418 | auto coerce_millis = |
1419 | (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()); |
1420 | ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10, |
1421 | default_writer_properties(), coerce_millis)); |
1422 | ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10, |
1423 | default_writer_properties(), coerce_millis)); |
1424 | |
1425 | // Loss of precision |
1426 | ASSERT_RAISES(Invalid, WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, |
1427 | default_writer_properties(), coerce_millis)); |
1428 | ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, |
1429 | default_writer_properties(), coerce_millis)); |
1430 | |
1431 | // OK to lose precision if we explicitly allow it |
1432 | auto allow_truncation = (ArrowWriterProperties::Builder() |
1433 | .coerce_timestamps(TimeUnit::MILLI) |
1434 | ->allow_truncated_timestamps() |
1435 | ->build()); |
1436 | ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, |
1437 | default_writer_properties(), allow_truncation)); |
1438 | ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, |
1439 | default_writer_properties(), allow_truncation)); |
1440 | |
1441 | // OK to write micros to micros |
1442 | auto coerce_micros = |
1443 | (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()); |
1444 | ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10, |
1445 | default_writer_properties(), coerce_micros)); |
1446 | |
1447 | // Loss of precision |
1448 | ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10, |
1449 | default_writer_properties(), coerce_micros)); |
1450 | } |
1451 | |
1452 | TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { |
1453 | using ::arrow::ArrayFromVector; |
1454 | |
1455 | std::vector<bool> is_valid = {true, true, true, false, true, true}; |
1456 | |
1457 | auto f0 = field("f0" , ::arrow::date64()); |
1458 | auto f1 = field("f1" , ::arrow::time32(TimeUnit::SECOND)); |
1459 | auto f2 = field("f2" , ::arrow::date64()); |
1460 | auto f3 = field("f3" , ::arrow::time32(TimeUnit::SECOND)); |
1461 | |
1462 | auto schema = ::arrow::schema({f0, f1, f2, f3}); |
1463 | |
1464 | std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000, |
1465 | 1489449600000, 1489536000000, 1489622400000}; |
1466 | std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5}; |
1467 | |
1468 | std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull, x1_nonnull; |
1469 | |
1470 | ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0); |
1471 | ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values, &a0_nonnull); |
1472 | |
1473 | ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1); |
1474 | ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values, &a1_nonnull); |
1475 | |
1476 | std::vector<std::shared_ptr<::arrow::Column>> columns = { |
1477 | std::make_shared<Column>("f0" , a0), std::make_shared<Column>("f1" , a1), |
1478 | std::make_shared<Column>("f2" , a0_nonnull), |
1479 | std::make_shared<Column>("f3" , a1_nonnull)}; |
1480 | auto table = Table::Make(schema, columns); |
1481 | |
1482 | // Expected schema and values |
1483 | auto e0 = field("f0" , ::arrow::date32()); |
1484 | auto e1 = field("f1" , ::arrow::time32(TimeUnit::MILLI)); |
1485 | auto e2 = field("f2" , ::arrow::date32()); |
1486 | auto e3 = field("f3" , ::arrow::time32(TimeUnit::MILLI)); |
1487 | auto ex_schema = ::arrow::schema({e0, e1, e2, e3}); |
1488 | |
1489 | std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241}; |
1490 | std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000}; |
1491 | ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0); |
1492 | ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values, &x0_nonnull); |
1493 | |
1494 | ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1); |
1495 | ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values, &x1_nonnull); |
1496 | |
1497 | std::vector<std::shared_ptr<::arrow::Column>> ex_columns = { |
1498 | std::make_shared<Column>("f0" , x0), std::make_shared<Column>("f1" , x1), |
1499 | std::make_shared<Column>("f2" , x0_nonnull), |
1500 | std::make_shared<Column>("f3" , x1_nonnull)}; |
1501 | auto ex_table = Table::Make(ex_schema, ex_columns); |
1502 | |
1503 | std::shared_ptr<Table> result; |
1504 | ASSERT_NO_FATAL_FAILURE( |
1505 | DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result)); |
1506 | |
1507 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result)); |
1508 | } |
1509 | |
1510 | void MakeDoubleTable(int num_columns, int num_rows, int nchunks, |
1511 | std::shared_ptr<Table>* out) { |
1512 | std::shared_ptr<::arrow::Column> column; |
1513 | std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns); |
1514 | std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns); |
1515 | |
1516 | for (int i = 0; i < num_columns; ++i) { |
1517 | std::vector<std::shared_ptr<Array>> arrays; |
1518 | std::shared_ptr<Array> values; |
1519 | ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, |
1520 | static_cast<uint32_t>(i), &values)); |
1521 | std::stringstream ss; |
1522 | ss << "col" << i; |
1523 | |
1524 | for (int j = 0; j < nchunks; ++j) { |
1525 | arrays.push_back(values); |
1526 | } |
1527 | column = MakeColumn(ss.str(), arrays, true); |
1528 | |
1529 | columns[i] = column; |
1530 | fields[i] = column->field(); |
1531 | } |
1532 | auto schema = std::make_shared<::arrow::Schema>(fields); |
1533 | *out = Table::Make(schema, columns); |
1534 | } |
1535 | |
1536 | void MakeListArray(int num_rows, int max_value_length, |
1537 | std::shared_ptr<::DataType>* out_type, |
1538 | std::shared_ptr<Array>* out_array) { |
1539 | std::vector<int32_t> length_draws; |
1540 | randint(num_rows, 0, max_value_length, &length_draws); |
1541 | |
1542 | std::vector<int32_t> offset_values; |
1543 | |
1544 | // Make sure some of them are length 0 |
1545 | int32_t total_elements = 0; |
1546 | for (size_t i = 0; i < length_draws.size(); ++i) { |
1547 | if (length_draws[i] < max_value_length / 10) { |
1548 | length_draws[i] = 0; |
1549 | } |
1550 | offset_values.push_back(total_elements); |
1551 | total_elements += length_draws[i]; |
1552 | } |
1553 | offset_values.push_back(total_elements); |
1554 | |
1555 | std::vector<int8_t> value_draws; |
1556 | randint(total_elements, 0, 100, &value_draws); |
1557 | |
1558 | std::vector<bool> is_valid; |
1559 | random_is_valid(total_elements, 0.1, &is_valid); |
1560 | |
1561 | std::shared_ptr<Array> values, offsets; |
1562 | ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid, |
1563 | value_draws, &values); |
1564 | ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets); |
1565 | |
1566 | ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(), |
1567 | out_array)); |
1568 | |
1569 | *out_type = ::arrow::list(::arrow::int8()); |
1570 | } |
1571 | |
1572 | TEST(TestArrowReadWrite, MultithreadedRead) { |
1573 | const int num_columns = 20; |
1574 | const int num_rows = 1000; |
1575 | const bool use_threads = true; |
1576 | |
1577 | std::shared_ptr<Table> table; |
1578 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1579 | |
1580 | std::shared_ptr<Table> result; |
1581 | ASSERT_NO_FATAL_FAILURE( |
1582 | DoSimpleRoundtrip(table, use_threads, table->num_rows(), {}, &result)); |
1583 | |
1584 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); |
1585 | } |
1586 | |
1587 | TEST(TestArrowReadWrite, ReadSingleRowGroup) { |
1588 | const int num_columns = 20; |
1589 | const int num_rows = 1000; |
1590 | |
1591 | std::shared_ptr<Table> table; |
1592 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1593 | |
1594 | std::shared_ptr<Buffer> buffer; |
1595 | ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2, |
1596 | default_arrow_writer_properties(), &buffer)); |
1597 | |
1598 | std::unique_ptr<FileReader> reader; |
1599 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
1600 | ::arrow::default_memory_pool(), |
1601 | ::parquet::default_reader_properties(), nullptr, &reader)); |
1602 | |
1603 | ASSERT_EQ(2, reader->num_row_groups()); |
1604 | |
1605 | std::shared_ptr<Table> r1, r2, r3, r4; |
1606 | // Read everything |
1607 | ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1)); |
1608 | ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2)); |
1609 | ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3)); |
1610 | ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4)); |
1611 | |
1612 | std::shared_ptr<Table> concatenated; |
1613 | |
1614 | ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated)); |
1615 | ASSERT_TRUE(table->Equals(*concatenated)); |
1616 | |
1617 | ASSERT_TRUE(table->Equals(*r3)); |
1618 | ASSERT_TRUE(r2->Equals(*r4)); |
1619 | ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated)); |
1620 | ASSERT_TRUE(table->Equals(*concatenated)); |
1621 | } |
1622 | |
1623 | TEST(TestArrowReadWrite, GetRecordBatchReader) { |
1624 | const int num_columns = 20; |
1625 | const int num_rows = 1000; |
1626 | |
1627 | std::shared_ptr<Table> table; |
1628 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1629 | |
1630 | std::shared_ptr<Buffer> buffer; |
1631 | ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2, |
1632 | default_arrow_writer_properties(), &buffer)); |
1633 | |
1634 | std::unique_ptr<FileReader> reader; |
1635 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
1636 | ::arrow::default_memory_pool(), |
1637 | ::parquet::default_reader_properties(), nullptr, &reader)); |
1638 | |
1639 | std::shared_ptr<::arrow::RecordBatchReader> rb_reader; |
1640 | ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader)); |
1641 | |
1642 | std::shared_ptr<::arrow::RecordBatch> batch; |
1643 | |
1644 | ASSERT_OK(rb_reader->ReadNext(&batch)); |
1645 | ASSERT_EQ(500, batch->num_rows()); |
1646 | ASSERT_EQ(20, batch->num_columns()); |
1647 | |
1648 | ASSERT_OK(rb_reader->ReadNext(&batch)); |
1649 | ASSERT_EQ(500, batch->num_rows()); |
1650 | ASSERT_EQ(20, batch->num_columns()); |
1651 | |
1652 | ASSERT_OK(rb_reader->ReadNext(&batch)); |
1653 | ASSERT_EQ(nullptr, batch); |
1654 | } |
1655 | |
1656 | TEST(TestArrowReadWrite, ScanContents) { |
1657 | const int num_columns = 20; |
1658 | const int num_rows = 1000; |
1659 | |
1660 | std::shared_ptr<Table> table; |
1661 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1662 | |
1663 | std::shared_ptr<Buffer> buffer; |
1664 | ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2, |
1665 | default_arrow_writer_properties(), &buffer)); |
1666 | |
1667 | std::unique_ptr<FileReader> reader; |
1668 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
1669 | ::arrow::default_memory_pool(), |
1670 | ::parquet::default_reader_properties(), nullptr, &reader)); |
1671 | |
1672 | int64_t num_rows_returned = 0; |
1673 | ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned)); |
1674 | ASSERT_EQ(num_rows, num_rows_returned); |
1675 | |
1676 | ASSERT_OK_NO_THROW(reader->ScanContents({0, 1, 2}, 256, &num_rows_returned)); |
1677 | ASSERT_EQ(num_rows, num_rows_returned); |
1678 | } |
1679 | |
1680 | TEST(TestArrowReadWrite, ReadColumnSubset) { |
1681 | const int num_columns = 20; |
1682 | const int num_rows = 1000; |
1683 | const bool use_threads = true; |
1684 | |
1685 | std::shared_ptr<Table> table; |
1686 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1687 | |
1688 | std::shared_ptr<Table> result; |
1689 | std::vector<int> column_subset = {0, 4, 8, 10}; |
1690 | ASSERT_NO_FATAL_FAILURE( |
1691 | DoSimpleRoundtrip(table, use_threads, table->num_rows(), column_subset, &result)); |
1692 | |
1693 | std::vector<std::shared_ptr<::arrow::Column>> ex_columns; |
1694 | std::vector<std::shared_ptr<::arrow::Field>> ex_fields; |
1695 | for (int i : column_subset) { |
1696 | ex_columns.push_back(table->column(i)); |
1697 | ex_fields.push_back(table->column(i)->field()); |
1698 | } |
1699 | |
1700 | auto ex_schema = ::arrow::schema(ex_fields); |
1701 | auto expected = Table::Make(ex_schema, ex_columns); |
1702 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*expected, *result)); |
1703 | } |
1704 | |
1705 | TEST(TestArrowReadWrite, ListLargeRecords) { |
1706 | // PARQUET-1308: This test passed on Linux when num_rows was smaller |
1707 | const int num_rows = 2000; |
1708 | const int row_group_size = 100; |
1709 | |
1710 | std::shared_ptr<Array> list_array; |
1711 | std::shared_ptr<::DataType> list_type; |
1712 | |
1713 | MakeListArray(num_rows, 20, &list_type, &list_array); |
1714 | |
1715 | auto schema = ::arrow::schema({::arrow::field("a" , list_type)}); |
1716 | |
1717 | std::shared_ptr<Table> table = Table::Make(schema, {list_array}); |
1718 | |
1719 | std::shared_ptr<Buffer> buffer; |
1720 | ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, |
1721 | default_arrow_writer_properties(), &buffer)); |
1722 | |
1723 | std::unique_ptr<FileReader> reader; |
1724 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
1725 | ::arrow::default_memory_pool(), |
1726 | ::parquet::default_reader_properties(), nullptr, &reader)); |
1727 | |
1728 | // Read everything |
1729 | std::shared_ptr<Table> result; |
1730 | ASSERT_OK_NO_THROW(reader->ReadTable(&result)); |
1731 | ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); |
1732 | |
1733 | // Read 1 record at a time |
1734 | ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), |
1735 | ::arrow::default_memory_pool(), |
1736 | ::parquet::default_reader_properties(), nullptr, &reader)); |
1737 | |
1738 | std::unique_ptr<ColumnReader> col_reader; |
1739 | ASSERT_OK(reader->GetColumn(0, &col_reader)); |
1740 | |
1741 | std::vector<std::shared_ptr<Array>> pieces; |
1742 | for (int i = 0; i < num_rows; ++i) { |
1743 | std::shared_ptr<ChunkedArray> chunked_piece; |
1744 | ASSERT_OK(col_reader->NextBatch(1, &chunked_piece)); |
1745 | ASSERT_EQ(1, chunked_piece->length()); |
1746 | ASSERT_EQ(1, chunked_piece->num_chunks()); |
1747 | pieces.push_back(chunked_piece->chunk(0)); |
1748 | } |
1749 | auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces); |
1750 | |
1751 | auto chunked_col = |
1752 | std::make_shared<::arrow::Column>(table->schema()->field(0), chunked); |
1753 | std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col}; |
1754 | auto chunked_table = Table::Make(table->schema(), columns); |
1755 | |
1756 | ASSERT_TRUE(table->Equals(*chunked_table)); |
1757 | } |
1758 | |
1759 | typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)> |
1760 | ArrayFactory; |
1761 | |
1762 | template <typename ArrowType> |
1763 | struct GenerateArrayFunctor { |
1764 | explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {} |
1765 | |
1766 | void operator()(int length, std::shared_ptr<::DataType>* type, |
1767 | std::shared_ptr<Array>* array) { |
1768 | using T = typename ArrowType::c_type; |
1769 | |
1770 | // TODO(wesm): generate things other than integers |
1771 | std::vector<T> draws; |
1772 | randint(length, 0, 100, &draws); |
1773 | |
1774 | std::vector<bool> is_valid; |
1775 | random_is_valid(length, this->pct_null, &is_valid); |
1776 | |
1777 | *type = ::arrow::TypeTraits<ArrowType>::type_singleton(); |
1778 | ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array); |
1779 | } |
1780 | |
1781 | double pct_null; |
1782 | }; |
1783 | |
1784 | typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)> |
1785 | ArrayFactory; |
1786 | |
1787 | auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type, |
1788 | std::shared_ptr<Array>* array) { |
1789 | GenerateArrayFunctor<::arrow::Int32Type> func; |
1790 | func(length, type, array); |
1791 | }; |
1792 | |
1793 | auto GenerateList = [](int length, std::shared_ptr<::DataType>* type, |
1794 | std::shared_ptr<Array>* array) { |
1795 | MakeListArray(length, 100, type, array); |
1796 | }; |
1797 | |
1798 | TEST(TestArrowReadWrite, TableWithChunkedColumns) { |
1799 | std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList}; |
1800 | |
1801 | std::vector<int> chunk_sizes = {2, 4, 10, 2}; |
1802 | const int64_t total_length = 18; |
1803 | |
1804 | for (const auto& datagen_func : functions) { |
1805 | ::arrow::ArrayVector arrays; |
1806 | std::shared_ptr<Array> arr; |
1807 | std::shared_ptr<::DataType> type; |
1808 | datagen_func(total_length, &type, &arr); |
1809 | |
1810 | int64_t offset = 0; |
1811 | for (int chunk_size : chunk_sizes) { |
1812 | arrays.push_back(arr->Slice(offset, chunk_size)); |
1813 | offset += chunk_size; |
1814 | } |
1815 | |
1816 | auto field = ::arrow::field("fname" , type); |
1817 | auto schema = ::arrow::schema({field}); |
1818 | auto col = std::make_shared<::arrow::Column>(field, arrays); |
1819 | auto table = Table::Make(schema, {col}); |
1820 | |
1821 | ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 2)); |
1822 | ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 3)); |
1823 | ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, 10)); |
1824 | } |
1825 | } |
1826 | |
1827 | TEST(TestArrowReadWrite, TableWithDuplicateColumns) { |
1828 | // See ARROW-1974 |
1829 | using ::arrow::ArrayFromVector; |
1830 | |
1831 | auto f0 = field("duplicate" , ::arrow::int8()); |
1832 | auto f1 = field("duplicate" , ::arrow::int16()); |
1833 | auto schema = ::arrow::schema({f0, f1}); |
1834 | |
1835 | std::vector<int8_t> a0_values = {1, 2, 3}; |
1836 | std::vector<int16_t> a1_values = {14, 15, 16}; |
1837 | |
1838 | std::shared_ptr<Array> a0, a1; |
1839 | |
1840 | ArrayFromVector<::arrow::Int8Type, int8_t>(a0_values, &a0); |
1841 | ArrayFromVector<::arrow::Int16Type, int16_t>(a1_values, &a1); |
1842 | |
1843 | auto table = Table::Make(schema, {std::make_shared<Column>(f0->name(), a0), |
1844 | std::make_shared<Column>(f1->name(), a1)}); |
1845 | ASSERT_NO_FATAL_FAILURE(CheckSimpleRoundtrip(table, table->num_rows())); |
1846 | } |
1847 | |
1848 | TEST(TestArrowReadWrite, DictionaryColumnChunkedWrite) { |
1849 | // This is a regression test for this: |
1850 | // |
1851 | // https://issues.apache.org/jira/browse/ARROW-1938 |
1852 | // |
1853 | // As of the writing of this test, columns of type |
1854 | // dictionary are written as their raw/expanded values. |
1855 | // The regression was that the whole column was being |
1856 | // written for each chunk. |
1857 | using ::arrow::ArrayFromVector; |
1858 | |
1859 | std::vector<std::string> values = {"first" , "second" , "third" }; |
1860 | auto type = ::arrow::utf8(); |
1861 | std::shared_ptr<Array> dict_values; |
1862 | ArrayFromVector<::arrow::StringType, std::string>(values, &dict_values); |
1863 | |
1864 | auto dict_type = ::arrow::dictionary(::arrow::int32(), dict_values); |
1865 | auto f0 = field("dictionary" , dict_type); |
1866 | std::vector<std::shared_ptr<::arrow::Field>> fields; |
1867 | fields.emplace_back(f0); |
1868 | auto schema = ::arrow::schema(fields); |
1869 | |
1870 | std::shared_ptr<Array> f0_values, f1_values; |
1871 | ArrayFromVector<::arrow::Int32Type, int32_t>({0, 1, 0, 2, 1}, &f0_values); |
1872 | ArrayFromVector<::arrow::Int32Type, int32_t>({2, 0, 1, 0, 2}, &f1_values); |
1873 | ::arrow::ArrayVector dict_arrays = { |
1874 | std::make_shared<::arrow::DictionaryArray>(dict_type, f0_values), |
1875 | std::make_shared<::arrow::DictionaryArray>(dict_type, f1_values)}; |
1876 | |
1877 | std::vector<std::shared_ptr<::arrow::Column>> columns; |
1878 | auto column = MakeColumn("column" , dict_arrays, false); |
1879 | columns.emplace_back(column); |
1880 | |
1881 | auto table = Table::Make(schema, columns); |
1882 | |
1883 | std::shared_ptr<Table> result; |
1884 | DoSimpleRoundtrip(table, 1, |
1885 | // Just need to make sure that we make |
1886 | // a chunk size that is smaller than the |
1887 | // total number of values |
1888 | 2, {}, &result); |
1889 | |
1890 | std::vector<std::string> expected_values = {"first" , "second" , "first" , "third" , |
1891 | "second" , "third" , "first" , "second" , |
1892 | "first" , "third" }; |
1893 | columns.clear(); |
1894 | |
1895 | std::shared_ptr<Array> expected_array; |
1896 | ArrayFromVector<::arrow::StringType, std::string>(expected_values, &expected_array); |
1897 | |
1898 | // The column name gets changed on output to the name of the |
1899 | // field, and it also turns into a nullable column |
1900 | columns.emplace_back(MakeColumn("dictionary" , expected_array, true)); |
1901 | |
1902 | fields.clear(); |
1903 | fields.emplace_back(::arrow::field("dictionary" , ::arrow::utf8())); |
1904 | schema = ::arrow::schema(fields); |
1905 | |
1906 | auto expected_table = Table::Make(schema, columns); |
1907 | |
1908 | ::arrow::AssertTablesEqual(*expected_table, *result, false); |
1909 | } |
1910 | |
1911 | TEST(TestArrowWrite, CheckChunkSize) { |
1912 | const int num_columns = 2; |
1913 | const int num_rows = 128; |
1914 | const int64_t chunk_size = 0; // note the chunk_size is 0 |
1915 | std::shared_ptr<Table> table; |
1916 | ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); |
1917 | |
1918 | auto sink = std::make_shared<InMemoryOutputStream>(); |
1919 | |
1920 | ASSERT_RAISES(Invalid, |
1921 | WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size)); |
1922 | } |
1923 | |
1924 | class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> { |
1925 | protected: |
1926 | // make it *3 to make it easily divisible by 3 |
1927 | const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3; |
1928 | std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr; |
1929 | |
1930 | void InitReader() { |
1931 | std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer(); |
1932 | ASSERT_OK_NO_THROW( |
1933 | OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(), |
1934 | ::parquet::default_reader_properties(), nullptr, &reader_)); |
1935 | } |
1936 | |
1937 | void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) { |
1938 | nested_parquet_ = std::make_shared<InMemoryOutputStream>(); |
1939 | |
1940 | writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema, |
1941 | default_writer_properties()); |
1942 | row_group_writer_ = writer_->AppendRowGroup(); |
1943 | } |
1944 | |
1945 | void FinalizeParquetFile() { |
1946 | row_group_writer_->Close(); |
1947 | writer_->Close(); |
1948 | } |
1949 | |
1950 | void MakeValues(int num_rows) { |
1951 | std::shared_ptr<Array> arr; |
1952 | ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr)); |
1953 | values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr); |
1954 | } |
1955 | |
1956 | void WriteColumnData(size_t num_rows, int16_t* def_levels, int16_t* rep_levels, |
1957 | int32_t* values) { |
1958 | auto typed_writer = |
1959 | static_cast<TypedColumnWriter<Int32Type>*>(row_group_writer_->NextColumn()); |
1960 | typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values); |
1961 | } |
1962 | |
1963 | void ValidateArray(const Array& array, size_t expected_nulls) { |
1964 | ASSERT_EQ(array.length(), values_array_->length()); |
1965 | ASSERT_EQ(array.null_count(), expected_nulls); |
1966 | // Also independently count the nulls |
1967 | auto local_null_count = 0; |
1968 | for (int i = 0; i < array.length(); i++) { |
1969 | if (array.IsNull(i)) { |
1970 | local_null_count++; |
1971 | } |
1972 | } |
1973 | ASSERT_EQ(local_null_count, expected_nulls); |
1974 | } |
1975 | |
1976 | void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) { |
1977 | ValidateArray(array, expected_nulls); |
1978 | int j = 0; |
1979 | for (int i = 0; i < values_array_->length(); i++) { |
1980 | if (array.IsNull(i)) { |
1981 | continue; |
1982 | } |
1983 | ASSERT_EQ(array.Value(i), values_array_->Value(j)); |
1984 | j++; |
1985 | } |
1986 | } |
1987 | |
1988 | void ValidateTableArrayTypes(const Table& table) { |
1989 | for (int i = 0; i < table.num_columns(); i++) { |
1990 | const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i); |
1991 | const std::shared_ptr<Column> column = table.column(i); |
1992 | // Compare with the column field |
1993 | ASSERT_TRUE(schema_field->Equals(column->field())); |
1994 | // Compare with the array type |
1995 | ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type())); |
1996 | } |
1997 | } |
1998 | |
1999 | // A parquet with a simple nested schema |
2000 | void CreateSimpleNestedParquet(Repetition::type struct_repetition) { |
2001 | std::vector<NodePtr> parquet_fields; |
2002 | // TODO(itaiin): We are using parquet low-level file api to create the nested parquet |
2003 | // this needs to change when a nested writes are implemented |
2004 | |
2005 | // create the schema: |
2006 | // <struct_repetition> group group1 { |
2007 | // required int32 leaf1; |
2008 | // optional int32 leaf2; |
2009 | // } |
2010 | // required int32 leaf3; |
2011 | |
2012 | parquet_fields.push_back(GroupNode::Make( |
2013 | "group1" , struct_repetition, |
2014 | {PrimitiveNode::Make("leaf1" , Repetition::REQUIRED, ParquetType::INT32), |
2015 | PrimitiveNode::Make("leaf2" , Repetition::OPTIONAL, ParquetType::INT32)})); |
2016 | parquet_fields.push_back( |
2017 | PrimitiveNode::Make("leaf3" , Repetition::REQUIRED, ParquetType::INT32)); |
2018 | |
2019 | auto schema_node = GroupNode::Make("schema" , Repetition::REQUIRED, parquet_fields); |
2020 | |
2021 | // Create definition levels for the different columns that contain interleaved |
2022 | // nulls and values at all nesting levels |
2023 | |
2024 | // definition levels for optional fields |
2025 | std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS); |
2026 | std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS); |
2027 | std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS); |
2028 | for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) { |
2029 | // leaf1 is required within the optional group1, so it is only null |
2030 | // when the group is null |
2031 | leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1; |
2032 | // leaf2 is optional, can be null in the primitive (def-level 1) or |
2033 | // struct level (def-level 0) |
2034 | leaf2_def_levels[i] = static_cast<int16_t>(i % 3); |
2035 | // leaf3 is required |
2036 | leaf3_def_levels[i] = 0; |
2037 | } |
2038 | |
2039 | std::vector<int16_t> rep_levels(NUM_SIMPLE_TEST_ROWS, 0); |
2040 | |
2041 | // Produce values for the columns |
2042 | MakeValues(NUM_SIMPLE_TEST_ROWS); |
2043 | int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data()); |
2044 | |
2045 | // Create the actual parquet file |
2046 | InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), |
2047 | NUM_SIMPLE_TEST_ROWS); |
2048 | |
2049 | // leaf1 column |
2050 | WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(), rep_levels.data(), |
2051 | values); |
2052 | // leaf2 column |
2053 | WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(), rep_levels.data(), |
2054 | values); |
2055 | // leaf3 column |
2056 | WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(), rep_levels.data(), |
2057 | values); |
2058 | |
2059 | FinalizeParquetFile(); |
2060 | InitReader(); |
2061 | } |
2062 | |
2063 | NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children, |
2064 | Repetition::type node_repetition, |
2065 | ParquetType::type leaf_type) { |
2066 | std::vector<NodePtr> children; |
2067 | |
2068 | for (int i = 0; i < num_children; i++) { |
2069 | if (depth <= 1) { |
2070 | children.push_back(PrimitiveNode::Make("leaf" , node_repetition, leaf_type)); |
2071 | } else { |
2072 | children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children, |
2073 | node_repetition, leaf_type)); |
2074 | } |
2075 | } |
2076 | |
2077 | std::stringstream ss; |
2078 | ss << "group-" << depth << "-" << index; |
2079 | return NodePtr(GroupNode::Make(ss.str(), node_repetition, children)); |
2080 | } |
2081 | |
2082 | // A deeply nested schema |
2083 | void CreateMultiLevelNestedParquet(int num_trees, int tree_depth, int num_children, |
2084 | int num_rows, Repetition::type node_repetition) { |
2085 | // Create the schema |
2086 | std::vector<NodePtr> parquet_fields; |
2087 | for (int i = 0; i < num_trees; i++) { |
2088 | parquet_fields.push_back(CreateSingleTypedNestedGroup( |
2089 | i, tree_depth, num_children, node_repetition, ParquetType::INT32)); |
2090 | } |
2091 | auto schema_node = GroupNode::Make("schema" , Repetition::REQUIRED, parquet_fields); |
2092 | |
2093 | int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth))); |
2094 | |
2095 | std::vector<int16_t> def_levels; |
2096 | std::vector<int16_t> rep_levels; |
2097 | |
2098 | int num_levels = 0; |
2099 | while (num_levels < num_rows) { |
2100 | if (node_repetition == Repetition::REQUIRED) { |
2101 | def_levels.push_back(0); // all are required |
2102 | } else { |
2103 | int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2)); |
2104 | def_levels.push_back(level); // all are optional |
2105 | } |
2106 | rep_levels.push_back(0); // none is repeated |
2107 | ++num_levels; |
2108 | } |
2109 | |
2110 | // Produce values for the columns |
2111 | MakeValues(num_rows); |
2112 | int32_t* values = reinterpret_cast<int32_t*>(values_array_->values()->mutable_data()); |
2113 | |
2114 | // Create the actual parquet file |
2115 | InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node), num_rows); |
2116 | |
2117 | for (int i = 0; i < num_columns; i++) { |
2118 | WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values); |
2119 | } |
2120 | FinalizeParquetFile(); |
2121 | InitReader(); |
2122 | } |
2123 | |
2124 | class DeepParquetTestVisitor : public ArrayVisitor { |
2125 | public: |
2126 | DeepParquetTestVisitor(Repetition::type node_repetition, |
2127 | std::shared_ptr<::arrow::Int32Array> expected) |
2128 | : node_repetition_(node_repetition), expected_(expected) {} |
2129 | |
2130 | Status Validate(std::shared_ptr<Array> tree) { return tree->Accept(this); } |
2131 | |
2132 | virtual Status Visit(const ::arrow::Int32Array& array) { |
2133 | if (node_repetition_ == Repetition::REQUIRED) { |
2134 | if (!array.Equals(expected_)) { |
2135 | return Status::Invalid("leaf array data mismatch" ); |
2136 | } |
2137 | } else if (node_repetition_ == Repetition::OPTIONAL) { |
2138 | if (array.length() != expected_->length()) { |
2139 | return Status::Invalid("Bad leaf array length" ); |
2140 | } |
2141 | // expect only 1 value every `depth` row |
2142 | if (array.null_count() != SMALL_SIZE) { |
2143 | return Status::Invalid("Unexpected null count" ); |
2144 | } |
2145 | } else { |
2146 | return Status::NotImplemented("Unsupported repetition" ); |
2147 | } |
2148 | return Status::OK(); |
2149 | } |
2150 | |
2151 | virtual Status Visit(const ::arrow::StructArray& array) { |
2152 | for (int32_t i = 0; i < array.num_fields(); ++i) { |
2153 | auto child = array.field(i); |
2154 | if (node_repetition_ == Repetition::REQUIRED) { |
2155 | RETURN_NOT_OK(child->Accept(this)); |
2156 | } else if (node_repetition_ == Repetition::OPTIONAL) { |
2157 | // Null count Must be a multiple of SMALL_SIZE |
2158 | if (array.null_count() % SMALL_SIZE != 0) { |
2159 | return Status::Invalid("Unexpected struct null count" ); |
2160 | } |
2161 | } else { |
2162 | return Status::NotImplemented("Unsupported repetition" ); |
2163 | } |
2164 | } |
2165 | return Status::OK(); |
2166 | } |
2167 | |
2168 | private: |
2169 | Repetition::type node_repetition_; |
2170 | std::shared_ptr<::arrow::Int32Array> expected_; |
2171 | }; |
2172 | |
2173 | std::shared_ptr<InMemoryOutputStream> nested_parquet_; |
2174 | std::unique_ptr<FileReader> reader_; |
2175 | std::unique_ptr<ParquetFileWriter> writer_; |
2176 | RowGroupWriter* row_group_writer_; |
2177 | }; |
2178 | |
2179 | TEST_F(TestNestedSchemaRead, ReadIntoTableFull) { |
2180 | ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL)); |
2181 | |
2182 | std::shared_ptr<Table> table; |
2183 | ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); |
2184 | ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
2185 | ASSERT_EQ(table->num_columns(), 2); |
2186 | ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); |
2187 | ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
2188 | |
2189 | auto struct_field_array = |
2190 | std::static_pointer_cast<::arrow::StructArray>(table->column(0)->data()->chunk(0)); |
2191 | auto leaf1_array = |
2192 | std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(0)); |
2193 | auto leaf2_array = |
2194 | std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(1)); |
2195 | auto leaf3_array = |
2196 | std::static_pointer_cast<::arrow::Int32Array>(table->column(1)->data()->chunk(0)); |
2197 | |
2198 | // validate struct and leaf arrays |
2199 | |
2200 | // validate struct array |
2201 | ASSERT_NO_FATAL_FAILURE(ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3)); |
2202 | // validate leaf1 |
2203 | ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3)); |
2204 | // validate leaf2 |
2205 | ASSERT_NO_FATAL_FAILURE( |
2206 | ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3)); |
2207 | // validate leaf3 |
2208 | ASSERT_NO_FATAL_FAILURE(ValidateColumnArray(*leaf3_array, 0)); |
2209 | } |
2210 | |
2211 | TEST_F(TestNestedSchemaRead, ReadTablePartial) { |
2212 | ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::OPTIONAL)); |
2213 | std::shared_ptr<Table> table; |
2214 | |
2215 | // columns: {group1.leaf1, leaf3} |
2216 | ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table)); |
2217 | ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
2218 | ASSERT_EQ(table->num_columns(), 2); |
2219 | ASSERT_EQ(table->schema()->field(0)->name(), "group1" ); |
2220 | ASSERT_EQ(table->schema()->field(1)->name(), "leaf3" ); |
2221 | ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1); |
2222 | ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
2223 | |
2224 | // columns: {group1.leaf1, group1.leaf2} |
2225 | ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table)); |
2226 | ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
2227 | ASSERT_EQ(table->num_columns(), 1); |
2228 | ASSERT_EQ(table->schema()->field(0)->name(), "group1" ); |
2229 | ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); |
2230 | ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
2231 | |
2232 | // columns: {leaf3} |
2233 | ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table)); |
2234 | ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
2235 | ASSERT_EQ(table->num_columns(), 1); |
2236 | ASSERT_EQ(table->schema()->field(0)->name(), "leaf3" ); |
2237 | ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0); |
2238 | ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
2239 | |
2240 | // Test with different ordering |
2241 | ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table)); |
2242 | ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); |
2243 | ASSERT_EQ(table->num_columns(), 2); |
2244 | ASSERT_EQ(table->schema()->field(0)->name(), "leaf3" ); |
2245 | ASSERT_EQ(table->schema()->field(1)->name(), "group1" ); |
2246 | ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1); |
2247 | ASSERT_NO_FATAL_FAILURE(ValidateTableArrayTypes(*table)); |
2248 | } |
2249 | |
2250 | TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) { |
2251 | ASSERT_NO_FATAL_FAILURE(CreateSimpleNestedParquet(Repetition::REPEATED)); |
2252 | std::shared_ptr<Table> table; |
2253 | ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table)); |
2254 | } |
2255 | |
2256 | TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { |
2257 | #ifdef PARQUET_VALGRIND |
2258 | const int num_trees = 3; |
2259 | const int depth = 3; |
2260 | #else |
2261 | const int num_trees = 5; |
2262 | const int depth = 5; |
2263 | #endif |
2264 | const int num_children = 3; |
2265 | int num_rows = SMALL_SIZE * (depth + 2); |
2266 | ASSERT_NO_FATAL_FAILURE(CreateMultiLevelNestedParquet(num_trees, depth, num_children, |
2267 | num_rows, GetParam())); |
2268 | std::shared_ptr<Table> table; |
2269 | ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); |
2270 | ASSERT_EQ(table->num_columns(), num_trees); |
2271 | ASSERT_EQ(table->num_rows(), num_rows); |
2272 | |
2273 | DeepParquetTestVisitor visitor(GetParam(), values_array_); |
2274 | for (int i = 0; i < table->num_columns(); i++) { |
2275 | auto tree = table->column(i)->data()->chunk(0); |
2276 | ASSERT_OK_NO_THROW(visitor.Validate(tree)); |
2277 | } |
2278 | } |
2279 | |
2280 | INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead, |
2281 | ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL)); |
2282 | |
2283 | TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) { |
2284 | // June 20, 2017 16:32:56 and 123456789 nanoseconds |
2285 | int64_t nanoseconds = INT64_C(1497976376123456789); |
2286 | |
2287 | Int96 calculated; |
2288 | |
2289 | Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}}; |
2290 | internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated); |
2291 | ASSERT_EQ(expected, calculated); |
2292 | } |
2293 | |
2294 | void TryReadDataFile(const std::string& testing_file_path, bool should_succeed = true) { |
2295 | std::string dir_string(test::get_data_dir()); |
2296 | std::stringstream ss; |
2297 | ss << dir_string << "/" << testing_file_path; |
2298 | auto path = ss.str(); |
2299 | |
2300 | auto pool = ::arrow::default_memory_pool(); |
2301 | |
2302 | std::unique_ptr<FileReader> arrow_reader; |
2303 | try { |
2304 | arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))); |
2305 | std::shared_ptr<::arrow::Table> table; |
2306 | ASSERT_OK(arrow_reader->ReadTable(&table)); |
2307 | } catch (const ParquetException& e) { |
2308 | if (should_succeed) { |
2309 | FAIL() << "Exception thrown when reading file: " << e.what(); |
2310 | } |
2311 | } |
2312 | } |
2313 | |
2314 | TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { |
2315 | // PARQUET-995 |
2316 | TryReadDataFile("alltypes_plain.parquet" ); |
2317 | } |
2318 | |
2319 | TEST(TestArrowReaderAdHoc, CorruptedSchema) { |
2320 | // PARQUET-1481 |
2321 | TryReadDataFile("bad_data/PARQUET-1481.parquet" , false /* should_succeed */); |
2322 | } |
2323 | |
2324 | class TestArrowReaderAdHocSparkAndHvr |
2325 | : public ::testing::TestWithParam< |
2326 | std::tuple<std::string, std::shared_ptr<::DataType>>> {}; |
2327 | |
2328 | TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) { |
2329 | std::string path(test::get_data_dir()); |
2330 | |
2331 | std::string filename; |
2332 | std::shared_ptr<::DataType> decimal_type; |
2333 | std::tie(filename, decimal_type) = GetParam(); |
2334 | |
2335 | path += "/" + filename; |
2336 | ASSERT_GT(path.size(), 0); |
2337 | |
2338 | auto pool = ::arrow::default_memory_pool(); |
2339 | |
2340 | std::unique_ptr<FileReader> arrow_reader; |
2341 | ASSERT_NO_THROW( |
2342 | arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)))); |
2343 | std::shared_ptr<::arrow::Table> table; |
2344 | ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); |
2345 | |
2346 | ASSERT_EQ(1, table->num_columns()); |
2347 | |
2348 | constexpr int32_t expected_length = 24; |
2349 | |
2350 | auto value_column = table->column(0); |
2351 | ASSERT_EQ(expected_length, value_column->length()); |
2352 | |
2353 | auto raw_array = value_column->data(); |
2354 | ASSERT_EQ(1, raw_array->num_chunks()); |
2355 | |
2356 | auto chunk = raw_array->chunk(0); |
2357 | |
2358 | std::shared_ptr<Array> expected_array; |
2359 | |
2360 | ::arrow::Decimal128Builder builder(decimal_type, pool); |
2361 | |
2362 | for (int32_t i = 0; i < expected_length; ++i) { |
2363 | ::arrow::Decimal128 value((i + 1) * 100); |
2364 | ASSERT_OK(builder.Append(value)); |
2365 | } |
2366 | ASSERT_OK(builder.Finish(&expected_array)); |
2367 | |
2368 | AssertArraysEqual(*expected_array, *chunk); |
2369 | } |
2370 | |
2371 | INSTANTIATE_TEST_CASE_P( |
2372 | ReadDecimals, TestArrowReaderAdHocSparkAndHvr, |
2373 | ::testing::Values( |
2374 | std::make_tuple("int32_decimal.parquet" , ::arrow::decimal(4, 2)), |
2375 | std::make_tuple("int64_decimal.parquet" , ::arrow::decimal(10, 2)), |
2376 | std::make_tuple("fixed_length_decimal.parquet" , ::arrow::decimal(25, 2)), |
2377 | std::make_tuple("fixed_length_decimal_legacy.parquet" , ::arrow::decimal(13, 2)), |
2378 | std::make_tuple("byte_array_decimal.parquet" , ::arrow::decimal(4, 2)))); |
2379 | |
2380 | } // namespace arrow |
2381 | |
2382 | } // namespace parquet |
2383 | |