1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <memory>
19#include <sstream>
20#include <string>
21
22#include <gtest/gtest.h>
23
24#include "arrow/array.h"
25#include "arrow/io/memory.h"
26#include "arrow/ipc/feather-internal.h"
27#include "arrow/ipc/feather_generated.h"
28#include "arrow/ipc/test-common.h"
29#include "arrow/memory_pool.h"
30#include "arrow/pretty_print.h"
31#include "arrow/record_batch.h"
32#include "arrow/status.h"
33#include "arrow/table.h"
34#include "arrow/test-util.h"
35#include "arrow/type.h"
36#include "arrow/util/checked_cast.h"
37
38namespace arrow {
39
40class Buffer;
41
42using internal::checked_cast;
43
44namespace ipc {
45namespace feather {
46
47template <typename T>
48inline void assert_vector_equal(const std::vector<T>& left, const std::vector<T>& right) {
49 ASSERT_EQ(left.size(), right.size());
50
51 for (size_t i = 0; i < left.size(); ++i) {
52 ASSERT_EQ(left[i], right[i]) << i;
53 }
54}
55
56class TestTableBuilder : public ::testing::Test {
57 public:
58 void SetUp() { tb_.reset(new TableBuilder(1000)); }
59
60 virtual void Finish() {
61 ASSERT_OK(tb_->Finish());
62
63 table_.reset(new TableMetadata());
64 ASSERT_OK(table_->Open(tb_->GetBuffer()));
65 }
66
67 protected:
68 std::unique_ptr<TableBuilder> tb_;
69 std::unique_ptr<TableMetadata> table_;
70};
71
72TEST_F(TestTableBuilder, Version) {
73 Finish();
74 ASSERT_EQ(kFeatherVersion, table_->version());
75}
76
77TEST_F(TestTableBuilder, EmptyTable) {
78 Finish();
79
80 ASSERT_FALSE(table_->HasDescription());
81 ASSERT_EQ("", table_->GetDescription());
82 ASSERT_EQ(1000, table_->num_rows());
83 ASSERT_EQ(0, table_->num_columns());
84}
85
86TEST_F(TestTableBuilder, SetDescription) {
87 std::string desc("this is some good data");
88 tb_->SetDescription(desc);
89 Finish();
90 ASSERT_TRUE(table_->HasDescription());
91 ASSERT_EQ(desc, table_->GetDescription());
92}
93
94void AssertArrayEquals(const ArrayMetadata& left, const ArrayMetadata& right) {
95 EXPECT_EQ(left.type, right.type);
96 EXPECT_EQ(left.offset, right.offset);
97 EXPECT_EQ(left.length, right.length);
98 EXPECT_EQ(left.null_count, right.null_count);
99 EXPECT_EQ(left.total_bytes, right.total_bytes);
100}
101
102TEST_F(TestTableBuilder, AddPrimitiveColumn) {
103 std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("f0");
104
105 ArrayMetadata values1;
106 ArrayMetadata values2;
107 values1.type = fbs::Type_INT32;
108 values1.offset = 10000;
109 values1.length = 1000;
110 values1.null_count = 100;
111 values1.total_bytes = 4000;
112
113 cb->SetValues(values1);
114
115 std::string user_meta = "as you wish";
116 cb->SetUserMetadata(user_meta);
117
118 ASSERT_OK(cb->Finish());
119
120 cb = tb_->AddColumn("f1");
121
122 values2.type = fbs::Type_UTF8;
123 values2.offset = 14000;
124 values2.length = 1000;
125 values2.null_count = 100;
126 values2.total_bytes = 10000;
127
128 cb->SetValues(values2);
129 ASSERT_OK(cb->Finish());
130
131 Finish();
132
133 ASSERT_EQ(2, table_->num_columns());
134
135 auto col = table_->column(0);
136
137 ASSERT_EQ("f0", col->name()->str());
138 ASSERT_EQ(user_meta, col->user_metadata()->str());
139
140 ArrayMetadata values3;
141 FromFlatbuffer(col->values(), &values3);
142 AssertArrayEquals(values3, values1);
143
144 col = table_->column(1);
145 ASSERT_EQ("f1", col->name()->str());
146
147 ArrayMetadata values4;
148 FromFlatbuffer(col->values(), &values4);
149 AssertArrayEquals(values4, values2);
150}
151
152TEST_F(TestTableBuilder, AddCategoryColumn) {
153 ArrayMetadata values1(fbs::Type_UINT8, 10000, 1000, 100, 4000);
154 ArrayMetadata levels(fbs::Type_UTF8, 14000, 10, 0, 300);
155
156 std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0");
157 cb->SetValues(values1);
158 cb->SetCategory(levels);
159 ASSERT_OK(cb->Finish());
160
161 cb = tb_->AddColumn("c1");
162 cb->SetValues(values1);
163 cb->SetCategory(levels, true);
164 ASSERT_OK(cb->Finish());
165
166 Finish();
167
168 auto col = table_->column(0);
169 ASSERT_EQ(fbs::TypeMetadata_CategoryMetadata, col->metadata_type());
170
171 ArrayMetadata result;
172 FromFlatbuffer(col->values(), &result);
173 AssertArrayEquals(result, values1);
174
175 auto cat_ptr = static_cast<const fbs::CategoryMetadata*>(col->metadata());
176 ASSERT_FALSE(cat_ptr->ordered());
177
178 FromFlatbuffer(cat_ptr->levels(), &result);
179 AssertArrayEquals(result, levels);
180
181 col = table_->column(1);
182 cat_ptr = static_cast<const fbs::CategoryMetadata*>(col->metadata());
183 ASSERT_TRUE(cat_ptr->ordered());
184 FromFlatbuffer(cat_ptr->levels(), &result);
185 AssertArrayEquals(result, levels);
186}
187
188TEST_F(TestTableBuilder, AddTimestampColumn) {
189 ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000);
190 std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0");
191 cb->SetValues(values1);
192 cb->SetTimestamp(TimeUnit::MILLI);
193 ASSERT_OK(cb->Finish());
194
195 cb = tb_->AddColumn("c1");
196
197 std::string tz("America/Los_Angeles");
198
199 cb->SetValues(values1);
200 cb->SetTimestamp(TimeUnit::SECOND, tz);
201 ASSERT_OK(cb->Finish());
202
203 Finish();
204
205 auto col = table_->column(0);
206
207 ASSERT_EQ(fbs::TypeMetadata_TimestampMetadata, col->metadata_type());
208
209 ArrayMetadata result;
210 FromFlatbuffer(col->values(), &result);
211 AssertArrayEquals(result, values1);
212
213 auto ts_ptr = static_cast<const fbs::TimestampMetadata*>(col->metadata());
214 ASSERT_EQ(fbs::TimeUnit_MILLISECOND, ts_ptr->unit());
215
216 col = table_->column(1);
217 ts_ptr = static_cast<const fbs::TimestampMetadata*>(col->metadata());
218 ASSERT_EQ(fbs::TimeUnit_SECOND, ts_ptr->unit());
219 ASSERT_EQ(tz, ts_ptr->timezone()->str());
220}
221
222TEST_F(TestTableBuilder, AddDateColumn) {
223 ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000);
224 std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("d0");
225 cb->SetValues(values1);
226 cb->SetDate();
227 ASSERT_OK(cb->Finish());
228
229 Finish();
230
231 auto col = table_->column(0);
232
233 ASSERT_EQ(fbs::TypeMetadata_DateMetadata, col->metadata_type());
234 ArrayMetadata result;
235 FromFlatbuffer(col->values(), &result);
236 AssertArrayEquals(result, values1);
237}
238
239TEST_F(TestTableBuilder, AddTimeColumn) {
240 ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000);
241 std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0");
242 cb->SetValues(values1);
243 cb->SetTime(TimeUnit::SECOND);
244 ASSERT_OK(cb->Finish());
245 Finish();
246
247 auto col = table_->column(0);
248
249 ASSERT_EQ(fbs::TypeMetadata_TimeMetadata, col->metadata_type());
250 ArrayMetadata result;
251 FromFlatbuffer(col->values(), &result);
252 AssertArrayEquals(result, values1);
253
254 auto t_ptr = static_cast<const fbs::TimeMetadata*>(col->metadata());
255 ASSERT_EQ(fbs::TimeUnit_SECOND, t_ptr->unit());
256}
257
258void CheckArrays(const Array& expected, const Array& result) {
259 if (!result.Equals(expected)) {
260 std::stringstream pp_result;
261 std::stringstream pp_expected;
262
263 EXPECT_OK(PrettyPrint(result, 0, &pp_result));
264 EXPECT_OK(PrettyPrint(expected, 0, &pp_expected));
265 FAIL() << "Got: " << pp_result.str() << "\nExpected: " << pp_expected.str();
266 }
267}
268
269void CheckBatches(const RecordBatch& expected, const RecordBatch& result) {
270 if (!result.Equals(expected)) {
271 std::stringstream pp_result;
272 std::stringstream pp_expected;
273
274 EXPECT_OK(PrettyPrint(result, 0, &pp_result));
275 EXPECT_OK(PrettyPrint(expected, 0, &pp_expected));
276 FAIL() << "Got: " << pp_result.str() << "\nExpected: " << pp_expected.str();
277 }
278}
279
280class TestTableReader : public ::testing::Test {
281 public:
282 void SetUp() {
283 ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &stream_));
284 ASSERT_OK(TableWriter::Open(stream_, &writer_));
285 }
286
287 void Finish() {
288 // Write table footer
289 ASSERT_OK(writer_->Finalize());
290
291 ASSERT_OK(stream_->Finish(&output_));
292
293 auto buffer = std::make_shared<io::BufferReader>(output_);
294 ASSERT_OK(TableReader::Open(buffer, &reader_));
295 }
296
297 protected:
298 std::shared_ptr<io::BufferOutputStream> stream_;
299 std::unique_ptr<TableWriter> writer_;
300 std::unique_ptr<TableReader> reader_;
301
302 std::shared_ptr<Buffer> output_;
303};
304
305TEST_F(TestTableReader, ReadIndices) {
306 std::shared_ptr<RecordBatch> batch1;
307 ASSERT_OK(MakeIntRecordBatch(&batch1));
308 std::shared_ptr<RecordBatch> batch2;
309 ASSERT_OK(MakeIntRecordBatch(&batch2));
310
311 ASSERT_OK(writer_->Append("f0", *batch1->column(0)));
312 ASSERT_OK(writer_->Append("f1", *batch1->column(1)));
313 ASSERT_OK(writer_->Append("f2", *batch2->column(0)));
314 ASSERT_OK(writer_->Append("f3", *batch2->column(1)));
315 Finish();
316
317 std::vector<int> indices({3, 0, 5});
318 std::shared_ptr<Table> result;
319 ASSERT_OK(reader_->Read(indices, &result));
320 std::vector<std::shared_ptr<Field>> fields;
321 std::vector<std::shared_ptr<Array>> arrays;
322 fields.push_back(std::make_shared<Field>("f0", int32()));
323 arrays.push_back(batch1->column(0));
324 fields.push_back(std::make_shared<Field>("f3", int32()));
325 arrays.push_back(batch2->column(1));
326 auto expected = Table::Make(std::make_shared<Schema>(fields), arrays);
327 AssertTablesEqual(*expected, *result);
328}
329
330TEST_F(TestTableReader, ReadNames) {
331 std::shared_ptr<RecordBatch> batch1;
332 ASSERT_OK(MakeIntRecordBatch(&batch1));
333 std::shared_ptr<RecordBatch> batch2;
334 ASSERT_OK(MakeIntRecordBatch(&batch2));
335
336 ASSERT_OK(writer_->Append("f0", *batch1->column(0)));
337 ASSERT_OK(writer_->Append("f1", *batch1->column(1)));
338 ASSERT_OK(writer_->Append("f2", *batch2->column(0)));
339 ASSERT_OK(writer_->Append("f3", *batch2->column(1)));
340 Finish();
341
342 std::vector<std::string> names({"f3", "f0", "f5"});
343 std::shared_ptr<Table> result;
344 ASSERT_OK(reader_->Read(names, &result));
345 std::vector<std::shared_ptr<Field>> fields;
346 std::vector<std::shared_ptr<Array>> arrays;
347 fields.push_back(std::make_shared<Field>("f0", int32()));
348 arrays.push_back(batch1->column(0));
349 fields.push_back(std::make_shared<Field>("f3", int32()));
350 arrays.push_back(batch2->column(1));
351 auto expected = Table::Make(std::make_shared<Schema>(fields), arrays);
352 AssertTablesEqual(*expected, *result);
353}
354
355class TestTableWriter : public ::testing::Test {
356 public:
357 void SetUp() {
358 ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &stream_));
359 ASSERT_OK(TableWriter::Open(stream_, &writer_));
360 }
361
362 void Finish() {
363 // Write table footer
364 ASSERT_OK(writer_->Finalize());
365
366 ASSERT_OK(stream_->Finish(&output_));
367
368 auto buffer = std::make_shared<io::BufferReader>(output_);
369 ASSERT_OK(TableReader::Open(buffer, &reader_));
370 }
371
372 void CheckBatch(std::shared_ptr<RecordBatch> batch) {
373 std::shared_ptr<Table> table;
374 std::vector<std::shared_ptr<RecordBatch>> batches = {batch};
375 ASSERT_OK(Table::FromRecordBatches(batches, &table));
376 ASSERT_OK(writer_->Write(*table));
377 Finish();
378
379 std::shared_ptr<Table> read_table;
380 ASSERT_OK(reader_->Read(&read_table));
381 AssertTablesEqual(*table, *read_table);
382 }
383
384 protected:
385 std::shared_ptr<io::BufferOutputStream> stream_;
386 std::unique_ptr<TableWriter> writer_;
387 std::unique_ptr<TableReader> reader_;
388
389 std::shared_ptr<Buffer> output_;
390};
391
392TEST_F(TestTableWriter, EmptyTable) {
393 Finish();
394
395 ASSERT_FALSE(reader_->HasDescription());
396 ASSERT_EQ("", reader_->GetDescription());
397
398 ASSERT_EQ(0, reader_->num_rows());
399 ASSERT_EQ(0, reader_->num_columns());
400}
401
402TEST_F(TestTableWriter, SetNumRows) {
403 writer_->SetNumRows(1000);
404 Finish();
405 ASSERT_EQ(1000, reader_->num_rows());
406}
407
408TEST_F(TestTableWriter, SetDescription) {
409 std::string desc("contents of the file");
410 writer_->SetDescription(desc);
411 Finish();
412
413 ASSERT_TRUE(reader_->HasDescription());
414 ASSERT_EQ(desc, reader_->GetDescription());
415
416 ASSERT_EQ(0, reader_->num_rows());
417 ASSERT_EQ(0, reader_->num_columns());
418}
419
420TEST_F(TestTableWriter, PrimitiveRoundTrip) {
421 std::shared_ptr<RecordBatch> batch;
422 ASSERT_OK(MakeIntRecordBatch(&batch));
423
424 ASSERT_OK(writer_->Append("f0", *batch->column(0)));
425 ASSERT_OK(writer_->Append("f1", *batch->column(1)));
426 Finish();
427
428 std::shared_ptr<Column> col;
429 ASSERT_OK(reader_->GetColumn(0, &col));
430 ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(0)));
431 ASSERT_EQ("f0", col->name());
432
433 ASSERT_OK(reader_->GetColumn(1, &col));
434 ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(1)));
435 ASSERT_EQ("f1", col->name());
436}
437
438TEST_F(TestTableWriter, CategoryRoundtrip) {
439 std::shared_ptr<RecordBatch> batch;
440 ASSERT_OK(MakeDictionaryFlat(&batch));
441 CheckBatch(batch);
442}
443
444TEST_F(TestTableWriter, TimeTypes) {
445 std::vector<bool> is_valid = {true, true, true, false, true, true, true};
446 auto f0 = field("f0", date32());
447 auto f1 = field("f1", time32(TimeUnit::MILLI));
448 auto f2 = field("f2", timestamp(TimeUnit::NANO));
449 auto f3 = field("f3", timestamp(TimeUnit::SECOND, "US/Los_Angeles"));
450 auto schema = ::arrow::schema({f0, f1, f2, f3});
451
452 std::vector<int64_t> values64_vec = {0, 1, 2, 3, 4, 5, 6};
453 std::shared_ptr<Array> values64;
454 ArrayFromVector<Int64Type, int64_t>(is_valid, values64_vec, &values64);
455
456 std::vector<int32_t> values32_vec = {10, 11, 12, 13, 14, 15, 16};
457 std::shared_ptr<Array> values32;
458 ArrayFromVector<Int32Type, int32_t>(is_valid, values32_vec, &values32);
459
460 std::vector<int32_t> date_values_vec = {20, 21, 22, 23, 24, 25, 26};
461 std::shared_ptr<Array> date_array;
462 ArrayFromVector<Date32Type, int32_t>(is_valid, date_values_vec, &date_array);
463
464 const auto& prim_values64 = checked_cast<const PrimitiveArray&>(*values64);
465 BufferVector buffers64 = {prim_values64.null_bitmap(), prim_values64.values()};
466
467 const auto& prim_values32 = checked_cast<const PrimitiveArray&>(*values32);
468 BufferVector buffers32 = {prim_values32.null_bitmap(), prim_values32.values()};
469
470 // Push date32 ArrayData
471 std::vector<std::shared_ptr<ArrayData>> arrays;
472 arrays.push_back(date_array->data());
473
474 // Create time32 ArrayData
475 arrays.emplace_back(ArrayData::Make(schema->field(1)->type(), values32->length(),
476 BufferVector(buffers32), values32->null_count(),
477 0));
478
479 // Create timestamp ArrayData
480 for (int i = 2; i < schema->num_fields(); ++i) {
481 arrays.emplace_back(ArrayData::Make(schema->field(i)->type(), values64->length(),
482 BufferVector(buffers64), values64->null_count(),
483 0));
484 }
485
486 auto batch = RecordBatch::Make(schema, 7, std::move(arrays));
487 CheckBatch(batch);
488}
489
490TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) {
491 std::shared_ptr<RecordBatch> batch;
492 ASSERT_OK(MakeStringTypesRecordBatch(&batch));
493 CheckBatch(batch);
494}
495
496TEST_F(TestTableWriter, PrimitiveNullRoundTrip) {
497 std::shared_ptr<RecordBatch> batch;
498 ASSERT_OK(MakeNullRecordBatch(&batch));
499
500 for (int i = 0; i < batch->num_columns(); ++i) {
501 ASSERT_OK(writer_->Append(batch->column_name(i), *batch->column(i)));
502 }
503 Finish();
504
505 std::shared_ptr<Column> col;
506 for (int i = 0; i < batch->num_columns(); ++i) {
507 ASSERT_OK(reader_->GetColumn(i, &col));
508 ASSERT_EQ(batch->column_name(i), col->name());
509 StringArray str_values(batch->column(i)->length(), nullptr, nullptr,
510 batch->column(i)->null_bitmap(),
511 batch->column(i)->null_count());
512 CheckArrays(str_values, *col->data()->chunk(0));
513 }
514}
515
516class TestTableWriterSlice : public TestTableWriter,
517 public ::testing::WithParamInterface<std::tuple<int, int>> {
518 public:
519 void CheckSlice(std::shared_ptr<RecordBatch> batch) {
520 auto p = GetParam();
521 auto start = std::get<0>(p);
522 auto size = std::get<1>(p);
523
524 batch = batch->Slice(start, size);
525
526 ASSERT_OK(writer_->Append("f0", *batch->column(0)));
527 ASSERT_OK(writer_->Append("f1", *batch->column(1)));
528 Finish();
529
530 std::shared_ptr<Column> col;
531 ASSERT_OK(reader_->GetColumn(0, &col));
532 ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(0)));
533 ASSERT_EQ("f0", col->name());
534
535 ASSERT_OK(reader_->GetColumn(1, &col));
536 ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(1)));
537 ASSERT_EQ("f1", col->name());
538 }
539};
540
541TEST_P(TestTableWriterSlice, SliceRoundTrip) {
542 std::shared_ptr<RecordBatch> batch;
543 ASSERT_OK(MakeIntBatchSized(600, &batch));
544 CheckSlice(batch);
545}
546
547TEST_P(TestTableWriterSlice, SliceStringsRoundTrip) {
548 auto p = GetParam();
549 auto start = std::get<0>(p);
550 auto with_nulls = start % 2 == 0;
551 std::shared_ptr<RecordBatch> batch;
552 ASSERT_OK(MakeStringTypesRecordBatch(&batch, with_nulls));
553 CheckSlice(batch);
554}
555
556TEST_P(TestTableWriterSlice, SliceBooleanRoundTrip) {
557 std::shared_ptr<RecordBatch> batch;
558 ASSERT_OK(MakeBooleanBatchSized(600, &batch));
559 CheckSlice(batch);
560}
561
562INSTANTIATE_TEST_CASE_P(TestTableWriterSliceOffsets, TestTableWriterSlice,
563 ::testing::Combine(::testing::Values(0, 1, 300, 301, 302, 303,
564 304, 305, 306, 307),
565 ::testing::Values(0, 1, 7, 8, 30, 32, 100)));
566
567} // namespace feather
568} // namespace ipc
569} // namespace arrow
570