1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/ipc/feather.h"
19
20#include <cstdint>
21#include <cstring>
22#include <memory>
23#include <sstream> // IWYU pragma: keep
24#include <string>
25#include <utility>
26#include <vector>
27
28#include "flatbuffers/flatbuffers.h"
29
30#include "arrow/array.h"
31#include "arrow/buffer.h"
32#include "arrow/io/interfaces.h"
33#include "arrow/ipc/feather-internal.h"
34#include "arrow/ipc/feather_generated.h"
35#include "arrow/ipc/util.h" // IWYU pragma: keep
36#include "arrow/status.h"
37#include "arrow/table.h" // IWYU pragma: keep
38#include "arrow/type.h"
39#include "arrow/type_traits.h"
40#include "arrow/util/bit-util.h"
41#include "arrow/util/checked_cast.h"
42#include "arrow/util/logging.h"
43#include "arrow/visitor.h"
44
45namespace arrow {
46
47using internal::checked_cast;
48
49namespace ipc {
50namespace feather {
51
52static const uint8_t kPaddingBytes[kFeatherDefaultAlignment] = {0};
53
54static inline int64_t PaddedLength(int64_t nbytes) {
55 static const int64_t alignment = kFeatherDefaultAlignment;
56 return ((nbytes + alignment - 1) / alignment) * alignment;
57}
58
59// XXX: Hack for Feather 0.3.0 for backwards compatibility with old files
60// Size in-file of written byte buffer
61static int64_t GetOutputLength(int64_t nbytes) {
62 if (kFeatherVersion < 2) {
63 // Feather files < 0.3.0
64 return nbytes;
65 } else {
66 return PaddedLength(nbytes);
67 }
68}
69
70static Status WritePadded(io::OutputStream* stream, const uint8_t* data, int64_t length,
71 int64_t* bytes_written) {
72 RETURN_NOT_OK(stream->Write(data, length));
73
74 int64_t remainder = PaddedLength(length) - length;
75 if (remainder != 0) {
76 RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
77 }
78 *bytes_written = length + remainder;
79 return Status::OK();
80}
81
82static Status WritePaddedWithOffset(io::OutputStream* stream, const uint8_t* data,
83 int64_t bit_offset, const int64_t length,
84 int64_t* bytes_written) {
85 data = data + bit_offset / 8;
86 uint8_t bit_shift = static_cast<uint8_t>(bit_offset % 8);
87 if (bit_offset == 0) {
88 RETURN_NOT_OK(stream->Write(data, length));
89 } else {
90 constexpr int64_t buffersize = 256;
91 uint8_t buffer[buffersize];
92 const uint8_t lshift = static_cast<uint8_t>(8 - bit_shift);
93 const uint8_t* buffer_end = buffer + buffersize;
94 uint8_t* buffer_it = buffer;
95
96 for (const uint8_t* end = data + length; data != end;) {
97 uint8_t r = static_cast<uint8_t>(*data++ >> bit_shift);
98 uint8_t l = static_cast<uint8_t>(*data << lshift);
99 uint8_t value = l | r;
100 *buffer_it++ = value;
101 if (buffer_it == buffer_end) {
102 RETURN_NOT_OK(stream->Write(buffer, buffersize));
103 buffer_it = buffer;
104 }
105 }
106 if (buffer_it != buffer) {
107 RETURN_NOT_OK(stream->Write(buffer, buffer_it - buffer));
108 }
109 }
110
111 int64_t remainder = PaddedLength(length) - length;
112 if (remainder != 0) {
113 RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
114 }
115 *bytes_written = length + remainder;
116 return Status::OK();
117}
118
119/// For compability, we need to write any data sometimes just to keep producing
120/// files that can be read with an older reader.
121static Status WritePaddedBlank(io::OutputStream* stream, int64_t length,
122 int64_t* bytes_written) {
123 const uint8_t null = 0;
124 for (int64_t i = 0; i < length; i++) {
125 RETURN_NOT_OK(stream->Write(&null, 1));
126 }
127
128 int64_t remainder = PaddedLength(length) - length;
129 if (remainder != 0) {
130 RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder));
131 }
132 *bytes_written = length + remainder;
133 return Status::OK();
134}
135
136// ----------------------------------------------------------------------
137// TableBuilder
138
139TableBuilder::TableBuilder(int64_t num_rows) : finished_(false), num_rows_(num_rows) {}
140
141FBB& TableBuilder::fbb() { return fbb_; }
142
143Status TableBuilder::Finish() {
144 if (finished_) {
145 return Status::Invalid("can only call this once");
146 }
147
148 FBString desc = 0;
149 if (!description_.empty()) {
150 desc = fbb_.CreateString(description_);
151 }
152
153 flatbuffers::Offset<flatbuffers::String> metadata = 0;
154
155 auto root = fbs::CreateCTable(fbb_, desc, num_rows_, fbb_.CreateVector(columns_),
156 kFeatherVersion, metadata);
157 fbb_.Finish(root);
158 finished_ = true;
159
160 return Status::OK();
161}
162
163std::shared_ptr<Buffer> TableBuilder::GetBuffer() const {
164 return std::make_shared<Buffer>(fbb_.GetBufferPointer(),
165 static_cast<int64_t>(fbb_.GetSize()));
166}
167
168void TableBuilder::SetDescription(const std::string& description) {
169 description_ = description;
170}
171
172void TableBuilder::SetNumRows(int64_t num_rows) { num_rows_ = num_rows; }
173
174void TableBuilder::add_column(const flatbuffers::Offset<fbs::Column>& col) {
175 columns_.push_back(col);
176}
177
178ColumnBuilder::ColumnBuilder(TableBuilder* parent, const std::string& name)
179 : parent_(parent) {
180 fbb_ = &parent->fbb();
181 name_ = name;
182 type_ = ColumnType::PRIMITIVE;
183 meta_time_.unit = TimeUnit::SECOND;
184}
185
186flatbuffers::Offset<void> ColumnBuilder::CreateColumnMetadata() {
187 switch (type_) {
188 case ColumnType::PRIMITIVE:
189 // flatbuffer void
190 return 0;
191 case ColumnType::CATEGORY: {
192 auto cat_meta = fbs::CreateCategoryMetadata(
193 fbb(), GetPrimitiveArray(fbb(), meta_category_.levels), meta_category_.ordered);
194 return cat_meta.Union();
195 }
196 case ColumnType::TIMESTAMP: {
197 // flatbuffer void
198 flatbuffers::Offset<flatbuffers::String> tz = 0;
199 if (!meta_timestamp_.timezone.empty()) {
200 tz = fbb().CreateString(meta_timestamp_.timezone);
201 }
202
203 auto ts_meta =
204 fbs::CreateTimestampMetadata(fbb(), ToFlatbufferEnum(meta_timestamp_.unit), tz);
205 return ts_meta.Union();
206 }
207 case ColumnType::DATE: {
208 auto date_meta = fbs::CreateDateMetadata(fbb());
209 return date_meta.Union();
210 }
211 case ColumnType::TIME: {
212 auto time_meta = fbs::CreateTimeMetadata(fbb(), ToFlatbufferEnum(meta_time_.unit));
213 return time_meta.Union();
214 }
215 default:
216 // null
217 return flatbuffers::Offset<void>();
218 }
219}
220
221Status ColumnBuilder::Finish() {
222 FBB& buf = fbb();
223
224 // values
225 auto values = GetPrimitiveArray(buf, values_);
226 flatbuffers::Offset<void> metadata = CreateColumnMetadata();
227
228 auto column = fbs::CreateColumn(buf, buf.CreateString(name_), values,
229 ToFlatbufferEnum(type_), // metadata_type
230 metadata, buf.CreateString(user_metadata_));
231
232 // bad coupling, but OK for now
233 parent_->add_column(column);
234 return Status::OK();
235}
236
237void ColumnBuilder::SetValues(const ArrayMetadata& values) { values_ = values; }
238
239void ColumnBuilder::SetUserMetadata(const std::string& data) { user_metadata_ = data; }
240
241void ColumnBuilder::SetCategory(const ArrayMetadata& levels, bool ordered) {
242 type_ = ColumnType::CATEGORY;
243 meta_category_.levels = levels;
244 meta_category_.ordered = ordered;
245}
246
247void ColumnBuilder::SetTimestamp(TimeUnit::type unit) {
248 type_ = ColumnType::TIMESTAMP;
249 meta_timestamp_.unit = unit;
250}
251
252void ColumnBuilder::SetTimestamp(TimeUnit::type unit, const std::string& timezone) {
253 SetTimestamp(unit);
254 meta_timestamp_.timezone = timezone;
255}
256
257void ColumnBuilder::SetDate() { type_ = ColumnType::DATE; }
258
259void ColumnBuilder::SetTime(TimeUnit::type unit) {
260 type_ = ColumnType::TIME;
261 meta_time_.unit = unit;
262}
263
264FBB& ColumnBuilder::fbb() { return *fbb_; }
265
266std::unique_ptr<ColumnBuilder> TableBuilder::AddColumn(const std::string& name) {
267 return std::unique_ptr<ColumnBuilder>(new ColumnBuilder(this, name));
268}
269
270// ----------------------------------------------------------------------
271// reader.cc
272
273class TableReader::TableReaderImpl {
274 public:
275 TableReaderImpl() {}
276
277 Status Open(const std::shared_ptr<io::RandomAccessFile>& source) {
278 source_ = source;
279
280 int magic_size = static_cast<int>(strlen(kFeatherMagicBytes));
281 int footer_size = magic_size + static_cast<int>(sizeof(uint32_t));
282
283 // Pathological issue where the file is smaller than
284 int64_t size = 0;
285 RETURN_NOT_OK(source->GetSize(&size));
286 if (size < magic_size + footer_size) {
287 return Status::Invalid("File is too small to be a well-formed file");
288 }
289
290 std::shared_ptr<Buffer> buffer;
291 RETURN_NOT_OK(source->ReadAt(0, magic_size, &buffer));
292
293 if (memcmp(buffer->data(), kFeatherMagicBytes, magic_size)) {
294 return Status::Invalid("Not a feather file");
295 }
296
297 // Now get the footer and verify
298 RETURN_NOT_OK(source->ReadAt(size - footer_size, footer_size, &buffer));
299
300 if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherMagicBytes, magic_size)) {
301 return Status::Invalid("Feather file footer incomplete");
302 }
303
304 uint32_t metadata_length = *reinterpret_cast<const uint32_t*>(buffer->data());
305 if (size < magic_size + footer_size + metadata_length) {
306 return Status::Invalid("File is smaller than indicated metadata size");
307 }
308 RETURN_NOT_OK(
309 source->ReadAt(size - footer_size - metadata_length, metadata_length, &buffer));
310
311 metadata_.reset(new TableMetadata());
312 return metadata_->Open(buffer);
313 }
314
315 Status GetDataType(const fbs::PrimitiveArray* values, fbs::TypeMetadata metadata_type,
316 const void* metadata, std::shared_ptr<DataType>* out) {
317#define PRIMITIVE_CASE(CAP_TYPE, FACTORY_FUNC) \
318 case fbs::Type_##CAP_TYPE: \
319 *out = FACTORY_FUNC(); \
320 break;
321
322 switch (metadata_type) {
323 case fbs::TypeMetadata_CategoryMetadata: {
324 auto meta = static_cast<const fbs::CategoryMetadata*>(metadata);
325
326 std::shared_ptr<DataType> index_type;
327 RETURN_NOT_OK(GetDataType(values, fbs::TypeMetadata_NONE, nullptr, &index_type));
328
329 std::shared_ptr<Array> levels;
330 RETURN_NOT_OK(
331 LoadValues(meta->levels(), fbs::TypeMetadata_NONE, nullptr, &levels));
332
333 *out = std::make_shared<DictionaryType>(index_type, levels, meta->ordered());
334 break;
335 }
336 case fbs::TypeMetadata_TimestampMetadata: {
337 auto meta = static_cast<const fbs::TimestampMetadata*>(metadata);
338 TimeUnit::type unit = FromFlatbufferEnum(meta->unit());
339 std::string tz;
340 // flatbuffer non-null
341 if (meta->timezone() != 0) {
342 tz = meta->timezone()->str();
343 } else {
344 tz = "";
345 }
346 *out = timestamp(unit, tz);
347 } break;
348 case fbs::TypeMetadata_DateMetadata:
349 *out = date32();
350 break;
351 case fbs::TypeMetadata_TimeMetadata: {
352 auto meta = static_cast<const fbs::TimeMetadata*>(metadata);
353 *out = time32(FromFlatbufferEnum(meta->unit()));
354 } break;
355 default:
356 switch (values->type()) {
357 PRIMITIVE_CASE(BOOL, boolean);
358 PRIMITIVE_CASE(INT8, int8);
359 PRIMITIVE_CASE(INT16, int16);
360 PRIMITIVE_CASE(INT32, int32);
361 PRIMITIVE_CASE(INT64, int64);
362 PRIMITIVE_CASE(UINT8, uint8);
363 PRIMITIVE_CASE(UINT16, uint16);
364 PRIMITIVE_CASE(UINT32, uint32);
365 PRIMITIVE_CASE(UINT64, uint64);
366 PRIMITIVE_CASE(FLOAT, float32);
367 PRIMITIVE_CASE(DOUBLE, float64);
368 PRIMITIVE_CASE(UTF8, utf8);
369 PRIMITIVE_CASE(BINARY, binary);
370 default:
371 return Status::Invalid("Unrecognized type");
372 }
373 break;
374 }
375
376#undef PRIMITIVE_CASE
377
378 return Status::OK();
379 }
380
381 // Retrieve a primitive array from the data source
382 //
383 // @returns: a Buffer instance, the precise type will depend on the kind of
384 // input data source (which may or may not have memory-map like semantics)
385 Status LoadValues(const fbs::PrimitiveArray* meta, fbs::TypeMetadata metadata_type,
386 const void* metadata, std::shared_ptr<Array>* out) {
387 std::shared_ptr<DataType> type;
388 RETURN_NOT_OK(GetDataType(meta, metadata_type, metadata, &type));
389
390 std::vector<std::shared_ptr<Buffer>> buffers;
391
392 // Buffer data from the source (may or may not perform a copy depending on
393 // input source)
394 std::shared_ptr<Buffer> buffer;
395 RETURN_NOT_OK(source_->ReadAt(meta->offset(), meta->total_bytes(), &buffer));
396
397 int64_t offset = 0;
398
399 // If there are nulls, the null bitmask is first
400 if (meta->null_count() > 0) {
401 int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(meta->length()));
402 buffers.push_back(SliceBuffer(buffer, offset, null_bitmap_size));
403 offset += null_bitmap_size;
404 } else {
405 buffers.push_back(nullptr);
406 }
407
408 if (is_binary_like(type->id())) {
409 int64_t offsets_size = GetOutputLength((meta->length() + 1) * sizeof(int32_t));
410 buffers.push_back(SliceBuffer(buffer, offset, offsets_size));
411 offset += offsets_size;
412 }
413
414 buffers.push_back(SliceBuffer(buffer, offset, buffer->size() - offset));
415
416 auto arr_data =
417 ArrayData::Make(type, meta->length(), std::move(buffers), meta->null_count());
418 *out = MakeArray(arr_data);
419 return Status::OK();
420 }
421
422 bool HasDescription() const { return metadata_->HasDescription(); }
423
424 std::string GetDescription() const { return metadata_->GetDescription(); }
425
426 int version() const { return metadata_->version(); }
427 int64_t num_rows() const { return metadata_->num_rows(); }
428 int64_t num_columns() const { return metadata_->num_columns(); }
429
430 std::string GetColumnName(int i) const {
431 const fbs::Column* col_meta = metadata_->column(i);
432 return col_meta->name()->str();
433 }
434
435 Status GetColumn(int i, std::shared_ptr<Column>* out) {
436 const fbs::Column* col_meta = metadata_->column(i);
437
438 // auto user_meta = column->user_metadata();
439 // if (user_meta->size() > 0) { user_metadata_ = user_meta->str(); }
440
441 std::shared_ptr<Array> values;
442 RETURN_NOT_OK(LoadValues(col_meta->values(), col_meta->metadata_type(),
443 col_meta->metadata(), &values));
444 out->reset(new Column(col_meta->name()->str(), values));
445 return Status::OK();
446 }
447
448 Status Read(std::shared_ptr<Table>* out) {
449 std::vector<std::shared_ptr<Field>> fields;
450 std::vector<std::shared_ptr<Column>> columns;
451 for (int i = 0; i < num_columns(); ++i) {
452 std::shared_ptr<Column> column;
453 RETURN_NOT_OK(GetColumn(i, &column));
454 columns.push_back(column);
455 fields.push_back(column->field());
456 }
457 *out = Table::Make(schema(fields), columns);
458 return Status::OK();
459 }
460
461 Status Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) {
462 std::vector<std::shared_ptr<Field>> fields;
463 std::vector<std::shared_ptr<Column>> columns;
464 for (int i = 0; i < num_columns(); ++i) {
465 bool found = false;
466 for (auto j : indices) {
467 if (i == j) {
468 found = true;
469 break;
470 }
471 }
472 if (!found) {
473 continue;
474 }
475 std::shared_ptr<Column> column;
476 RETURN_NOT_OK(GetColumn(i, &column));
477 columns.push_back(column);
478 fields.push_back(column->field());
479 }
480 *out = Table::Make(schema(fields), columns);
481 return Status::OK();
482 }
483
484 Status Read(const std::vector<std::string>& names, std::shared_ptr<Table>* out) {
485 std::vector<std::shared_ptr<Field>> fields;
486 std::vector<std::shared_ptr<Column>> columns;
487 for (int i = 0; i < num_columns(); ++i) {
488 auto name = GetColumnName(i);
489 bool found = false;
490 for (auto& n : names) {
491 if (name == n) {
492 found = true;
493 break;
494 }
495 }
496 if (!found) {
497 continue;
498 }
499 std::shared_ptr<Column> column;
500 RETURN_NOT_OK(GetColumn(i, &column));
501 columns.push_back(column);
502 fields.push_back(column->field());
503 }
504 *out = Table::Make(schema(fields), columns);
505 return Status::OK();
506 }
507
508 private:
509 std::shared_ptr<io::RandomAccessFile> source_;
510 std::unique_ptr<TableMetadata> metadata_;
511
512 std::shared_ptr<Schema> schema_;
513};
514
515// ----------------------------------------------------------------------
516// TableReader public API
517
518TableReader::TableReader() { impl_.reset(new TableReaderImpl()); }
519
520TableReader::~TableReader() {}
521
522Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source,
523 std::unique_ptr<TableReader>* out) {
524 out->reset(new TableReader());
525 return (*out)->impl_->Open(source);
526}
527
528bool TableReader::HasDescription() const { return impl_->HasDescription(); }
529
530std::string TableReader::GetDescription() const { return impl_->GetDescription(); }
531
532int TableReader::version() const { return impl_->version(); }
533
534int64_t TableReader::num_rows() const { return impl_->num_rows(); }
535
536int64_t TableReader::num_columns() const { return impl_->num_columns(); }
537
538std::string TableReader::GetColumnName(int i) const { return impl_->GetColumnName(i); }
539
540Status TableReader::GetColumn(int i, std::shared_ptr<Column>* out) {
541 return impl_->GetColumn(i, out);
542}
543
544Status TableReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); }
545
546Status TableReader::Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) {
547 return impl_->Read(indices, out);
548}
549
550Status TableReader::Read(const std::vector<std::string>& names,
551 std::shared_ptr<Table>* out) {
552 return impl_->Read(names, out);
553}
554
555// ----------------------------------------------------------------------
556// writer.cc
557
558fbs::Type ToFlatbufferType(Type::type type) {
559 switch (type) {
560 case Type::BOOL:
561 return fbs::Type_BOOL;
562 case Type::INT8:
563 return fbs::Type_INT8;
564 case Type::INT16:
565 return fbs::Type_INT16;
566 case Type::INT32:
567 return fbs::Type_INT32;
568 case Type::INT64:
569 return fbs::Type_INT64;
570 case Type::UINT8:
571 return fbs::Type_UINT8;
572 case Type::UINT16:
573 return fbs::Type_UINT16;
574 case Type::UINT32:
575 return fbs::Type_UINT32;
576 case Type::UINT64:
577 return fbs::Type_UINT64;
578 case Type::FLOAT:
579 return fbs::Type_FLOAT;
580 case Type::DOUBLE:
581 return fbs::Type_DOUBLE;
582 case Type::STRING:
583 return fbs::Type_UTF8;
584 case Type::BINARY:
585 return fbs::Type_BINARY;
586 case Type::DATE32:
587 return fbs::Type_INT32;
588 case Type::TIMESTAMP:
589 return fbs::Type_INT64;
590 case Type::TIME32:
591 return fbs::Type_INT32;
592 case Type::TIME64:
593 return fbs::Type_INT64;
594 default:
595 DCHECK(false) << "Cannot reach this code";
596 }
597 // prevent compiler warning
598 return fbs::Type_MIN;
599}
600
601static Status SanitizeUnsupportedTypes(const Array& values, std::shared_ptr<Array>* out) {
602 if (values.type_id() == Type::NA) {
603 // As long as R doesn't support NA, we write this as a StringColumn
604 // to ensure stable roundtrips.
605 *out = std::make_shared<StringArray>(values.length(), nullptr, nullptr,
606 values.null_bitmap(), values.null_count());
607 return Status::OK();
608 } else {
609 *out = MakeArray(values.data());
610 return Status::OK();
611 }
612}
613
614class TableWriter::TableWriterImpl : public ArrayVisitor {
615 public:
616 TableWriterImpl() : initialized_stream_(false), metadata_(0) {}
617
618 Status Open(const std::shared_ptr<io::OutputStream>& stream) {
619 stream_ = stream;
620 return Status::OK();
621 }
622
623 void SetDescription(const std::string& desc) { metadata_.SetDescription(desc); }
624
625 void SetNumRows(int64_t num_rows) { metadata_.SetNumRows(num_rows); }
626
627 Status Finalize() {
628 RETURN_NOT_OK(CheckStarted());
629 RETURN_NOT_OK(metadata_.Finish());
630
631 auto buffer = metadata_.GetBuffer();
632
633 // Writer metadata
634 int64_t bytes_written;
635 RETURN_NOT_OK(
636 WritePadded(stream_.get(), buffer->data(), buffer->size(), &bytes_written));
637 uint32_t buffer_size = static_cast<uint32_t>(bytes_written);
638
639 // Footer: metadata length, magic bytes
640 RETURN_NOT_OK(stream_->Write(&buffer_size, sizeof(uint32_t)));
641 return stream_->Write(kFeatherMagicBytes, strlen(kFeatherMagicBytes));
642 }
643
644 Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) {
645 if (!(is_primitive(values.type_id()) || is_binary_like(values.type_id()))) {
646 return Status::Invalid("Array is not primitive type: ", values.type()->ToString());
647 }
648
649 meta->type = ToFlatbufferType(values.type_id());
650
651 RETURN_NOT_OK(stream_->Tell(&meta->offset));
652
653 meta->length = values.length();
654 meta->null_count = values.null_count();
655 meta->total_bytes = 0;
656
657 return Status::OK();
658 }
659
660 Status WriteArray(const Array& values, ArrayMetadata* meta) {
661 RETURN_NOT_OK(CheckStarted());
662 RETURN_NOT_OK(LoadArrayMetadata(values, meta));
663
664 int64_t bytes_written;
665
666 // Write the null bitmask
667 if (values.null_count() > 0) {
668 // We assume there is one bit for each value in values.nulls,
669 // starting at the zero offset.
670 int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(values.length()));
671 if (values.null_bitmap()) {
672 auto null_bitmap = values.null_bitmap();
673 RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), null_bitmap->data(),
674 values.offset(), null_bitmap_size,
675 &bytes_written));
676 } else {
677 RETURN_NOT_OK(WritePaddedBlank(stream_.get(), null_bitmap_size, &bytes_written));
678 }
679 meta->total_bytes += bytes_written;
680 }
681
682 int64_t values_bytes = 0;
683 int64_t bit_offset = 0;
684
685 const uint8_t* values_buffer = nullptr;
686
687 if (is_binary_like(values.type_id())) {
688 const auto& bin_values = checked_cast<const BinaryArray&>(values);
689
690 int64_t offset_bytes = sizeof(int32_t) * (values.length() + 1);
691
692 if (bin_values.value_offsets()) {
693 values_bytes = bin_values.raw_value_offsets()[values.length()];
694
695 // Write the variable-length offsets
696 RETURN_NOT_OK(
697 WritePadded(stream_.get(),
698 reinterpret_cast<const uint8_t*>(bin_values.raw_value_offsets()),
699 offset_bytes, &bytes_written));
700 } else {
701 RETURN_NOT_OK(WritePaddedBlank(stream_.get(), offset_bytes, &bytes_written));
702 }
703 meta->total_bytes += bytes_written;
704
705 if (bin_values.value_data()) {
706 values_buffer = bin_values.value_data()->data();
707 }
708 } else {
709 const auto& prim_values = checked_cast<const PrimitiveArray&>(values);
710 const auto& fw_type = checked_cast<const FixedWidthType&>(*values.type());
711
712 values_bytes = BitUtil::BytesForBits(values.length() * fw_type.bit_width());
713
714 if (prim_values.values()) {
715 values_buffer = prim_values.values()->data() +
716 (prim_values.offset() * fw_type.bit_width() / 8);
717 bit_offset = (prim_values.offset() * fw_type.bit_width()) % 8;
718 }
719 }
720 if (values_buffer) {
721 RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), values_buffer, bit_offset,
722 values_bytes, &bytes_written));
723 } else {
724 RETURN_NOT_OK(WritePaddedBlank(stream_.get(), values_bytes, &bytes_written));
725 }
726 meta->total_bytes += bytes_written;
727
728 return Status::OK();
729 }
730
731 Status WritePrimitiveValues(const Array& values) {
732 // Prepare metadata payload
733 ArrayMetadata meta;
734 RETURN_NOT_OK(WriteArray(values, &meta));
735 current_column_->SetValues(meta);
736 return Status::OK();
737 }
738
739 Status Visit(const NullArray& values) override {
740 std::shared_ptr<Array> sanitized_nulls;
741 RETURN_NOT_OK(SanitizeUnsupportedTypes(values, &sanitized_nulls));
742 return WritePrimitiveValues(*sanitized_nulls);
743 }
744
745#define VISIT_PRIMITIVE(TYPE) \
746 Status Visit(const TYPE& values) override { return WritePrimitiveValues(values); }
747
748 VISIT_PRIMITIVE(BooleanArray)
749 VISIT_PRIMITIVE(Int8Array)
750 VISIT_PRIMITIVE(Int16Array)
751 VISIT_PRIMITIVE(Int32Array)
752 VISIT_PRIMITIVE(Int64Array)
753 VISIT_PRIMITIVE(UInt8Array)
754 VISIT_PRIMITIVE(UInt16Array)
755 VISIT_PRIMITIVE(UInt32Array)
756 VISIT_PRIMITIVE(UInt64Array)
757 VISIT_PRIMITIVE(FloatArray)
758 VISIT_PRIMITIVE(DoubleArray)
759 VISIT_PRIMITIVE(BinaryArray)
760 VISIT_PRIMITIVE(StringArray)
761
762#undef VISIT_PRIMITIVE
763
764 Status Visit(const DictionaryArray& values) override {
765 const auto& dict_type = checked_cast<const DictionaryType&>(*values.type());
766
767 if (!is_integer(values.indices()->type_id())) {
768 return Status::Invalid("Category values must be integers");
769 }
770
771 RETURN_NOT_OK(WritePrimitiveValues(*values.indices()));
772
773 ArrayMetadata levels_meta;
774 std::shared_ptr<Array> sanitized_dictionary;
775 RETURN_NOT_OK(
776 SanitizeUnsupportedTypes(*dict_type.dictionary(), &sanitized_dictionary));
777 RETURN_NOT_OK(WriteArray(*sanitized_dictionary, &levels_meta));
778 current_column_->SetCategory(levels_meta, dict_type.ordered());
779 return Status::OK();
780 }
781
782 Status Visit(const TimestampArray& values) override {
783 RETURN_NOT_OK(WritePrimitiveValues(values));
784 const auto& ts_type = checked_cast<const TimestampType&>(*values.type());
785 current_column_->SetTimestamp(ts_type.unit(), ts_type.timezone());
786 return Status::OK();
787 }
788
789 Status Visit(const Date32Array& values) override {
790 RETURN_NOT_OK(WritePrimitiveValues(values));
791 current_column_->SetDate();
792 return Status::OK();
793 }
794
795 Status Visit(const Time32Array& values) override {
796 RETURN_NOT_OK(WritePrimitiveValues(values));
797 auto unit = checked_cast<const Time32Type&>(*values.type()).unit();
798 current_column_->SetTime(unit);
799 return Status::OK();
800 }
801
802 Status Visit(const Time64Array& values) override {
803 return Status::NotImplemented("time64");
804 }
805
806 Status Append(const std::string& name, const Array& values) {
807 current_column_ = metadata_.AddColumn(name);
808 RETURN_NOT_OK(values.Accept(this));
809 return current_column_->Finish();
810 }
811
812 Status Write(const Table& table) {
813 for (int i = 0; i < table.num_columns(); ++i) {
814 auto column = table.column(i);
815 current_column_ = metadata_.AddColumn(column->name());
816 auto chunked_array = column->data();
817 for (const auto chunk : chunked_array->chunks()) {
818 RETURN_NOT_OK(chunk->Accept(this));
819 }
820 RETURN_NOT_OK(current_column_->Finish());
821 }
822 return Status::OK();
823 }
824
825 private:
826 Status CheckStarted() {
827 if (!initialized_stream_) {
828 int64_t bytes_written_unused;
829 RETURN_NOT_OK(WritePadded(stream_.get(),
830 reinterpret_cast<const uint8_t*>(kFeatherMagicBytes),
831 strlen(kFeatherMagicBytes), &bytes_written_unused));
832 initialized_stream_ = true;
833 }
834 return Status::OK();
835 }
836
837 std::shared_ptr<io::OutputStream> stream_;
838
839 bool initialized_stream_;
840 TableBuilder metadata_;
841
842 std::unique_ptr<ColumnBuilder> current_column_;
843
844 Status AppendPrimitive(const PrimitiveArray& values, ArrayMetadata* out);
845};
846
847TableWriter::TableWriter() { impl_.reset(new TableWriterImpl()); }
848
849TableWriter::~TableWriter() {}
850
851Status TableWriter::Open(const std::shared_ptr<io::OutputStream>& stream,
852 std::unique_ptr<TableWriter>* out) {
853 out->reset(new TableWriter());
854 return (*out)->impl_->Open(stream);
855}
856
857void TableWriter::SetDescription(const std::string& desc) { impl_->SetDescription(desc); }
858
859void TableWriter::SetNumRows(int64_t num_rows) { impl_->SetNumRows(num_rows); }
860
861Status TableWriter::Append(const std::string& name, const Array& values) {
862 return impl_->Append(name, values);
863}
864
865Status TableWriter::Write(const Table& table) { return impl_->Write(table); }
866
867Status TableWriter::Finalize() { return impl_->Finalize(); }
868
869} // namespace feather
870} // namespace ipc
871} // namespace arrow
872