1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/ipc/feather.h" |
19 | |
20 | #include <cstdint> |
21 | #include <cstring> |
22 | #include <memory> |
23 | #include <sstream> // IWYU pragma: keep |
24 | #include <string> |
25 | #include <utility> |
26 | #include <vector> |
27 | |
28 | #include "flatbuffers/flatbuffers.h" |
29 | |
30 | #include "arrow/array.h" |
31 | #include "arrow/buffer.h" |
32 | #include "arrow/io/interfaces.h" |
33 | #include "arrow/ipc/feather-internal.h" |
34 | #include "arrow/ipc/feather_generated.h" |
35 | #include "arrow/ipc/util.h" // IWYU pragma: keep |
36 | #include "arrow/status.h" |
37 | #include "arrow/table.h" // IWYU pragma: keep |
38 | #include "arrow/type.h" |
39 | #include "arrow/type_traits.h" |
40 | #include "arrow/util/bit-util.h" |
41 | #include "arrow/util/checked_cast.h" |
42 | #include "arrow/util/logging.h" |
43 | #include "arrow/visitor.h" |
44 | |
45 | namespace arrow { |
46 | |
47 | using internal::checked_cast; |
48 | |
49 | namespace ipc { |
50 | namespace feather { |
51 | |
52 | static const uint8_t kPaddingBytes[kFeatherDefaultAlignment] = {0}; |
53 | |
54 | static inline int64_t PaddedLength(int64_t nbytes) { |
55 | static const int64_t alignment = kFeatherDefaultAlignment; |
56 | return ((nbytes + alignment - 1) / alignment) * alignment; |
57 | } |
58 | |
59 | // XXX: Hack for Feather 0.3.0 for backwards compatibility with old files |
60 | // Size in-file of written byte buffer |
61 | static int64_t GetOutputLength(int64_t nbytes) { |
62 | if (kFeatherVersion < 2) { |
63 | // Feather files < 0.3.0 |
64 | return nbytes; |
65 | } else { |
66 | return PaddedLength(nbytes); |
67 | } |
68 | } |
69 | |
70 | static Status WritePadded(io::OutputStream* stream, const uint8_t* data, int64_t length, |
71 | int64_t* bytes_written) { |
72 | RETURN_NOT_OK(stream->Write(data, length)); |
73 | |
74 | int64_t remainder = PaddedLength(length) - length; |
75 | if (remainder != 0) { |
76 | RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder)); |
77 | } |
78 | *bytes_written = length + remainder; |
79 | return Status::OK(); |
80 | } |
81 | |
82 | static Status WritePaddedWithOffset(io::OutputStream* stream, const uint8_t* data, |
83 | int64_t bit_offset, const int64_t length, |
84 | int64_t* bytes_written) { |
85 | data = data + bit_offset / 8; |
86 | uint8_t bit_shift = static_cast<uint8_t>(bit_offset % 8); |
87 | if (bit_offset == 0) { |
88 | RETURN_NOT_OK(stream->Write(data, length)); |
89 | } else { |
90 | constexpr int64_t buffersize = 256; |
91 | uint8_t buffer[buffersize]; |
92 | const uint8_t lshift = static_cast<uint8_t>(8 - bit_shift); |
93 | const uint8_t* buffer_end = buffer + buffersize; |
94 | uint8_t* buffer_it = buffer; |
95 | |
96 | for (const uint8_t* end = data + length; data != end;) { |
97 | uint8_t r = static_cast<uint8_t>(*data++ >> bit_shift); |
98 | uint8_t l = static_cast<uint8_t>(*data << lshift); |
99 | uint8_t value = l | r; |
100 | *buffer_it++ = value; |
101 | if (buffer_it == buffer_end) { |
102 | RETURN_NOT_OK(stream->Write(buffer, buffersize)); |
103 | buffer_it = buffer; |
104 | } |
105 | } |
106 | if (buffer_it != buffer) { |
107 | RETURN_NOT_OK(stream->Write(buffer, buffer_it - buffer)); |
108 | } |
109 | } |
110 | |
111 | int64_t remainder = PaddedLength(length) - length; |
112 | if (remainder != 0) { |
113 | RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder)); |
114 | } |
115 | *bytes_written = length + remainder; |
116 | return Status::OK(); |
117 | } |
118 | |
119 | /// For compability, we need to write any data sometimes just to keep producing |
120 | /// files that can be read with an older reader. |
121 | static Status WritePaddedBlank(io::OutputStream* stream, int64_t length, |
122 | int64_t* bytes_written) { |
123 | const uint8_t null = 0; |
124 | for (int64_t i = 0; i < length; i++) { |
125 | RETURN_NOT_OK(stream->Write(&null, 1)); |
126 | } |
127 | |
128 | int64_t remainder = PaddedLength(length) - length; |
129 | if (remainder != 0) { |
130 | RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder)); |
131 | } |
132 | *bytes_written = length + remainder; |
133 | return Status::OK(); |
134 | } |
135 | |
136 | // ---------------------------------------------------------------------- |
137 | // TableBuilder |
138 | |
139 | TableBuilder::TableBuilder(int64_t num_rows) : finished_(false), num_rows_(num_rows) {} |
140 | |
141 | FBB& TableBuilder::fbb() { return fbb_; } |
142 | |
143 | Status TableBuilder::Finish() { |
144 | if (finished_) { |
145 | return Status::Invalid("can only call this once" ); |
146 | } |
147 | |
148 | FBString desc = 0; |
149 | if (!description_.empty()) { |
150 | desc = fbb_.CreateString(description_); |
151 | } |
152 | |
153 | flatbuffers::Offset<flatbuffers::String> metadata = 0; |
154 | |
155 | auto root = fbs::CreateCTable(fbb_, desc, num_rows_, fbb_.CreateVector(columns_), |
156 | kFeatherVersion, metadata); |
157 | fbb_.Finish(root); |
158 | finished_ = true; |
159 | |
160 | return Status::OK(); |
161 | } |
162 | |
163 | std::shared_ptr<Buffer> TableBuilder::GetBuffer() const { |
164 | return std::make_shared<Buffer>(fbb_.GetBufferPointer(), |
165 | static_cast<int64_t>(fbb_.GetSize())); |
166 | } |
167 | |
168 | void TableBuilder::SetDescription(const std::string& description) { |
169 | description_ = description; |
170 | } |
171 | |
172 | void TableBuilder::SetNumRows(int64_t num_rows) { num_rows_ = num_rows; } |
173 | |
174 | void TableBuilder::add_column(const flatbuffers::Offset<fbs::Column>& col) { |
175 | columns_.push_back(col); |
176 | } |
177 | |
178 | ColumnBuilder::ColumnBuilder(TableBuilder* parent, const std::string& name) |
179 | : parent_(parent) { |
180 | fbb_ = &parent->fbb(); |
181 | name_ = name; |
182 | type_ = ColumnType::PRIMITIVE; |
183 | meta_time_.unit = TimeUnit::SECOND; |
184 | } |
185 | |
186 | flatbuffers::Offset<void> ColumnBuilder::CreateColumnMetadata() { |
187 | switch (type_) { |
188 | case ColumnType::PRIMITIVE: |
189 | // flatbuffer void |
190 | return 0; |
191 | case ColumnType::CATEGORY: { |
192 | auto cat_meta = fbs::CreateCategoryMetadata( |
193 | fbb(), GetPrimitiveArray(fbb(), meta_category_.levels), meta_category_.ordered); |
194 | return cat_meta.Union(); |
195 | } |
196 | case ColumnType::TIMESTAMP: { |
197 | // flatbuffer void |
198 | flatbuffers::Offset<flatbuffers::String> tz = 0; |
199 | if (!meta_timestamp_.timezone.empty()) { |
200 | tz = fbb().CreateString(meta_timestamp_.timezone); |
201 | } |
202 | |
203 | auto ts_meta = |
204 | fbs::CreateTimestampMetadata(fbb(), ToFlatbufferEnum(meta_timestamp_.unit), tz); |
205 | return ts_meta.Union(); |
206 | } |
207 | case ColumnType::DATE: { |
208 | auto date_meta = fbs::CreateDateMetadata(fbb()); |
209 | return date_meta.Union(); |
210 | } |
211 | case ColumnType::TIME: { |
212 | auto time_meta = fbs::CreateTimeMetadata(fbb(), ToFlatbufferEnum(meta_time_.unit)); |
213 | return time_meta.Union(); |
214 | } |
215 | default: |
216 | // null |
217 | return flatbuffers::Offset<void>(); |
218 | } |
219 | } |
220 | |
221 | Status ColumnBuilder::Finish() { |
222 | FBB& buf = fbb(); |
223 | |
224 | // values |
225 | auto values = GetPrimitiveArray(buf, values_); |
226 | flatbuffers::Offset<void> metadata = CreateColumnMetadata(); |
227 | |
228 | auto column = fbs::CreateColumn(buf, buf.CreateString(name_), values, |
229 | ToFlatbufferEnum(type_), // metadata_type |
230 | metadata, buf.CreateString(user_metadata_)); |
231 | |
232 | // bad coupling, but OK for now |
233 | parent_->add_column(column); |
234 | return Status::OK(); |
235 | } |
236 | |
237 | void ColumnBuilder::SetValues(const ArrayMetadata& values) { values_ = values; } |
238 | |
239 | void ColumnBuilder::SetUserMetadata(const std::string& data) { user_metadata_ = data; } |
240 | |
241 | void ColumnBuilder::SetCategory(const ArrayMetadata& levels, bool ordered) { |
242 | type_ = ColumnType::CATEGORY; |
243 | meta_category_.levels = levels; |
244 | meta_category_.ordered = ordered; |
245 | } |
246 | |
247 | void ColumnBuilder::SetTimestamp(TimeUnit::type unit) { |
248 | type_ = ColumnType::TIMESTAMP; |
249 | meta_timestamp_.unit = unit; |
250 | } |
251 | |
252 | void ColumnBuilder::SetTimestamp(TimeUnit::type unit, const std::string& timezone) { |
253 | SetTimestamp(unit); |
254 | meta_timestamp_.timezone = timezone; |
255 | } |
256 | |
257 | void ColumnBuilder::SetDate() { type_ = ColumnType::DATE; } |
258 | |
259 | void ColumnBuilder::SetTime(TimeUnit::type unit) { |
260 | type_ = ColumnType::TIME; |
261 | meta_time_.unit = unit; |
262 | } |
263 | |
264 | FBB& ColumnBuilder::fbb() { return *fbb_; } |
265 | |
266 | std::unique_ptr<ColumnBuilder> TableBuilder::AddColumn(const std::string& name) { |
267 | return std::unique_ptr<ColumnBuilder>(new ColumnBuilder(this, name)); |
268 | } |
269 | |
270 | // ---------------------------------------------------------------------- |
271 | // reader.cc |
272 | |
273 | class TableReader::TableReaderImpl { |
274 | public: |
275 | TableReaderImpl() {} |
276 | |
277 | Status Open(const std::shared_ptr<io::RandomAccessFile>& source) { |
278 | source_ = source; |
279 | |
280 | int magic_size = static_cast<int>(strlen(kFeatherMagicBytes)); |
281 | int = magic_size + static_cast<int>(sizeof(uint32_t)); |
282 | |
283 | // Pathological issue where the file is smaller than |
284 | int64_t size = 0; |
285 | RETURN_NOT_OK(source->GetSize(&size)); |
286 | if (size < magic_size + footer_size) { |
287 | return Status::Invalid("File is too small to be a well-formed file" ); |
288 | } |
289 | |
290 | std::shared_ptr<Buffer> buffer; |
291 | RETURN_NOT_OK(source->ReadAt(0, magic_size, &buffer)); |
292 | |
293 | if (memcmp(buffer->data(), kFeatherMagicBytes, magic_size)) { |
294 | return Status::Invalid("Not a feather file" ); |
295 | } |
296 | |
297 | // Now get the footer and verify |
298 | RETURN_NOT_OK(source->ReadAt(size - footer_size, footer_size, &buffer)); |
299 | |
300 | if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherMagicBytes, magic_size)) { |
301 | return Status::Invalid("Feather file footer incomplete" ); |
302 | } |
303 | |
304 | uint32_t metadata_length = *reinterpret_cast<const uint32_t*>(buffer->data()); |
305 | if (size < magic_size + footer_size + metadata_length) { |
306 | return Status::Invalid("File is smaller than indicated metadata size" ); |
307 | } |
308 | RETURN_NOT_OK( |
309 | source->ReadAt(size - footer_size - metadata_length, metadata_length, &buffer)); |
310 | |
311 | metadata_.reset(new TableMetadata()); |
312 | return metadata_->Open(buffer); |
313 | } |
314 | |
315 | Status GetDataType(const fbs::PrimitiveArray* values, fbs::TypeMetadata metadata_type, |
316 | const void* metadata, std::shared_ptr<DataType>* out) { |
317 | #define PRIMITIVE_CASE(CAP_TYPE, FACTORY_FUNC) \ |
318 | case fbs::Type_##CAP_TYPE: \ |
319 | *out = FACTORY_FUNC(); \ |
320 | break; |
321 | |
322 | switch (metadata_type) { |
323 | case fbs::TypeMetadata_CategoryMetadata: { |
324 | auto meta = static_cast<const fbs::CategoryMetadata*>(metadata); |
325 | |
326 | std::shared_ptr<DataType> index_type; |
327 | RETURN_NOT_OK(GetDataType(values, fbs::TypeMetadata_NONE, nullptr, &index_type)); |
328 | |
329 | std::shared_ptr<Array> levels; |
330 | RETURN_NOT_OK( |
331 | LoadValues(meta->levels(), fbs::TypeMetadata_NONE, nullptr, &levels)); |
332 | |
333 | *out = std::make_shared<DictionaryType>(index_type, levels, meta->ordered()); |
334 | break; |
335 | } |
336 | case fbs::TypeMetadata_TimestampMetadata: { |
337 | auto meta = static_cast<const fbs::TimestampMetadata*>(metadata); |
338 | TimeUnit::type unit = FromFlatbufferEnum(meta->unit()); |
339 | std::string tz; |
340 | // flatbuffer non-null |
341 | if (meta->timezone() != 0) { |
342 | tz = meta->timezone()->str(); |
343 | } else { |
344 | tz = "" ; |
345 | } |
346 | *out = timestamp(unit, tz); |
347 | } break; |
348 | case fbs::TypeMetadata_DateMetadata: |
349 | *out = date32(); |
350 | break; |
351 | case fbs::TypeMetadata_TimeMetadata: { |
352 | auto meta = static_cast<const fbs::TimeMetadata*>(metadata); |
353 | *out = time32(FromFlatbufferEnum(meta->unit())); |
354 | } break; |
355 | default: |
356 | switch (values->type()) { |
357 | PRIMITIVE_CASE(BOOL, boolean); |
358 | PRIMITIVE_CASE(INT8, int8); |
359 | PRIMITIVE_CASE(INT16, int16); |
360 | PRIMITIVE_CASE(INT32, int32); |
361 | PRIMITIVE_CASE(INT64, int64); |
362 | PRIMITIVE_CASE(UINT8, uint8); |
363 | PRIMITIVE_CASE(UINT16, uint16); |
364 | PRIMITIVE_CASE(UINT32, uint32); |
365 | PRIMITIVE_CASE(UINT64, uint64); |
366 | PRIMITIVE_CASE(FLOAT, float32); |
367 | PRIMITIVE_CASE(DOUBLE, float64); |
368 | PRIMITIVE_CASE(UTF8, utf8); |
369 | PRIMITIVE_CASE(BINARY, binary); |
370 | default: |
371 | return Status::Invalid("Unrecognized type" ); |
372 | } |
373 | break; |
374 | } |
375 | |
376 | #undef PRIMITIVE_CASE |
377 | |
378 | return Status::OK(); |
379 | } |
380 | |
381 | // Retrieve a primitive array from the data source |
382 | // |
383 | // @returns: a Buffer instance, the precise type will depend on the kind of |
384 | // input data source (which may or may not have memory-map like semantics) |
385 | Status LoadValues(const fbs::PrimitiveArray* meta, fbs::TypeMetadata metadata_type, |
386 | const void* metadata, std::shared_ptr<Array>* out) { |
387 | std::shared_ptr<DataType> type; |
388 | RETURN_NOT_OK(GetDataType(meta, metadata_type, metadata, &type)); |
389 | |
390 | std::vector<std::shared_ptr<Buffer>> buffers; |
391 | |
392 | // Buffer data from the source (may or may not perform a copy depending on |
393 | // input source) |
394 | std::shared_ptr<Buffer> buffer; |
395 | RETURN_NOT_OK(source_->ReadAt(meta->offset(), meta->total_bytes(), &buffer)); |
396 | |
397 | int64_t offset = 0; |
398 | |
399 | // If there are nulls, the null bitmask is first |
400 | if (meta->null_count() > 0) { |
401 | int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(meta->length())); |
402 | buffers.push_back(SliceBuffer(buffer, offset, null_bitmap_size)); |
403 | offset += null_bitmap_size; |
404 | } else { |
405 | buffers.push_back(nullptr); |
406 | } |
407 | |
408 | if (is_binary_like(type->id())) { |
409 | int64_t offsets_size = GetOutputLength((meta->length() + 1) * sizeof(int32_t)); |
410 | buffers.push_back(SliceBuffer(buffer, offset, offsets_size)); |
411 | offset += offsets_size; |
412 | } |
413 | |
414 | buffers.push_back(SliceBuffer(buffer, offset, buffer->size() - offset)); |
415 | |
416 | auto arr_data = |
417 | ArrayData::Make(type, meta->length(), std::move(buffers), meta->null_count()); |
418 | *out = MakeArray(arr_data); |
419 | return Status::OK(); |
420 | } |
421 | |
422 | bool HasDescription() const { return metadata_->HasDescription(); } |
423 | |
424 | std::string GetDescription() const { return metadata_->GetDescription(); } |
425 | |
426 | int version() const { return metadata_->version(); } |
427 | int64_t num_rows() const { return metadata_->num_rows(); } |
428 | int64_t num_columns() const { return metadata_->num_columns(); } |
429 | |
430 | std::string GetColumnName(int i) const { |
431 | const fbs::Column* col_meta = metadata_->column(i); |
432 | return col_meta->name()->str(); |
433 | } |
434 | |
435 | Status GetColumn(int i, std::shared_ptr<Column>* out) { |
436 | const fbs::Column* col_meta = metadata_->column(i); |
437 | |
438 | // auto user_meta = column->user_metadata(); |
439 | // if (user_meta->size() > 0) { user_metadata_ = user_meta->str(); } |
440 | |
441 | std::shared_ptr<Array> values; |
442 | RETURN_NOT_OK(LoadValues(col_meta->values(), col_meta->metadata_type(), |
443 | col_meta->metadata(), &values)); |
444 | out->reset(new Column(col_meta->name()->str(), values)); |
445 | return Status::OK(); |
446 | } |
447 | |
448 | Status Read(std::shared_ptr<Table>* out) { |
449 | std::vector<std::shared_ptr<Field>> fields; |
450 | std::vector<std::shared_ptr<Column>> columns; |
451 | for (int i = 0; i < num_columns(); ++i) { |
452 | std::shared_ptr<Column> column; |
453 | RETURN_NOT_OK(GetColumn(i, &column)); |
454 | columns.push_back(column); |
455 | fields.push_back(column->field()); |
456 | } |
457 | *out = Table::Make(schema(fields), columns); |
458 | return Status::OK(); |
459 | } |
460 | |
461 | Status Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) { |
462 | std::vector<std::shared_ptr<Field>> fields; |
463 | std::vector<std::shared_ptr<Column>> columns; |
464 | for (int i = 0; i < num_columns(); ++i) { |
465 | bool found = false; |
466 | for (auto j : indices) { |
467 | if (i == j) { |
468 | found = true; |
469 | break; |
470 | } |
471 | } |
472 | if (!found) { |
473 | continue; |
474 | } |
475 | std::shared_ptr<Column> column; |
476 | RETURN_NOT_OK(GetColumn(i, &column)); |
477 | columns.push_back(column); |
478 | fields.push_back(column->field()); |
479 | } |
480 | *out = Table::Make(schema(fields), columns); |
481 | return Status::OK(); |
482 | } |
483 | |
484 | Status Read(const std::vector<std::string>& names, std::shared_ptr<Table>* out) { |
485 | std::vector<std::shared_ptr<Field>> fields; |
486 | std::vector<std::shared_ptr<Column>> columns; |
487 | for (int i = 0; i < num_columns(); ++i) { |
488 | auto name = GetColumnName(i); |
489 | bool found = false; |
490 | for (auto& n : names) { |
491 | if (name == n) { |
492 | found = true; |
493 | break; |
494 | } |
495 | } |
496 | if (!found) { |
497 | continue; |
498 | } |
499 | std::shared_ptr<Column> column; |
500 | RETURN_NOT_OK(GetColumn(i, &column)); |
501 | columns.push_back(column); |
502 | fields.push_back(column->field()); |
503 | } |
504 | *out = Table::Make(schema(fields), columns); |
505 | return Status::OK(); |
506 | } |
507 | |
508 | private: |
509 | std::shared_ptr<io::RandomAccessFile> source_; |
510 | std::unique_ptr<TableMetadata> metadata_; |
511 | |
512 | std::shared_ptr<Schema> schema_; |
513 | }; |
514 | |
515 | // ---------------------------------------------------------------------- |
516 | // TableReader public API |
517 | |
518 | TableReader::TableReader() { impl_.reset(new TableReaderImpl()); } |
519 | |
520 | TableReader::~TableReader() {} |
521 | |
522 | Status TableReader::Open(const std::shared_ptr<io::RandomAccessFile>& source, |
523 | std::unique_ptr<TableReader>* out) { |
524 | out->reset(new TableReader()); |
525 | return (*out)->impl_->Open(source); |
526 | } |
527 | |
528 | bool TableReader::HasDescription() const { return impl_->HasDescription(); } |
529 | |
530 | std::string TableReader::GetDescription() const { return impl_->GetDescription(); } |
531 | |
532 | int TableReader::version() const { return impl_->version(); } |
533 | |
534 | int64_t TableReader::num_rows() const { return impl_->num_rows(); } |
535 | |
536 | int64_t TableReader::num_columns() const { return impl_->num_columns(); } |
537 | |
538 | std::string TableReader::GetColumnName(int i) const { return impl_->GetColumnName(i); } |
539 | |
540 | Status TableReader::GetColumn(int i, std::shared_ptr<Column>* out) { |
541 | return impl_->GetColumn(i, out); |
542 | } |
543 | |
544 | Status TableReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); } |
545 | |
546 | Status TableReader::Read(const std::vector<int>& indices, std::shared_ptr<Table>* out) { |
547 | return impl_->Read(indices, out); |
548 | } |
549 | |
550 | Status TableReader::Read(const std::vector<std::string>& names, |
551 | std::shared_ptr<Table>* out) { |
552 | return impl_->Read(names, out); |
553 | } |
554 | |
555 | // ---------------------------------------------------------------------- |
556 | // writer.cc |
557 | |
558 | fbs::Type ToFlatbufferType(Type::type type) { |
559 | switch (type) { |
560 | case Type::BOOL: |
561 | return fbs::Type_BOOL; |
562 | case Type::INT8: |
563 | return fbs::Type_INT8; |
564 | case Type::INT16: |
565 | return fbs::Type_INT16; |
566 | case Type::INT32: |
567 | return fbs::Type_INT32; |
568 | case Type::INT64: |
569 | return fbs::Type_INT64; |
570 | case Type::UINT8: |
571 | return fbs::Type_UINT8; |
572 | case Type::UINT16: |
573 | return fbs::Type_UINT16; |
574 | case Type::UINT32: |
575 | return fbs::Type_UINT32; |
576 | case Type::UINT64: |
577 | return fbs::Type_UINT64; |
578 | case Type::FLOAT: |
579 | return fbs::Type_FLOAT; |
580 | case Type::DOUBLE: |
581 | return fbs::Type_DOUBLE; |
582 | case Type::STRING: |
583 | return fbs::Type_UTF8; |
584 | case Type::BINARY: |
585 | return fbs::Type_BINARY; |
586 | case Type::DATE32: |
587 | return fbs::Type_INT32; |
588 | case Type::TIMESTAMP: |
589 | return fbs::Type_INT64; |
590 | case Type::TIME32: |
591 | return fbs::Type_INT32; |
592 | case Type::TIME64: |
593 | return fbs::Type_INT64; |
594 | default: |
595 | DCHECK(false) << "Cannot reach this code" ; |
596 | } |
597 | // prevent compiler warning |
598 | return fbs::Type_MIN; |
599 | } |
600 | |
601 | static Status SanitizeUnsupportedTypes(const Array& values, std::shared_ptr<Array>* out) { |
602 | if (values.type_id() == Type::NA) { |
603 | // As long as R doesn't support NA, we write this as a StringColumn |
604 | // to ensure stable roundtrips. |
605 | *out = std::make_shared<StringArray>(values.length(), nullptr, nullptr, |
606 | values.null_bitmap(), values.null_count()); |
607 | return Status::OK(); |
608 | } else { |
609 | *out = MakeArray(values.data()); |
610 | return Status::OK(); |
611 | } |
612 | } |
613 | |
614 | class TableWriter::TableWriterImpl : public ArrayVisitor { |
615 | public: |
616 | TableWriterImpl() : initialized_stream_(false), metadata_(0) {} |
617 | |
618 | Status Open(const std::shared_ptr<io::OutputStream>& stream) { |
619 | stream_ = stream; |
620 | return Status::OK(); |
621 | } |
622 | |
623 | void SetDescription(const std::string& desc) { metadata_.SetDescription(desc); } |
624 | |
625 | void SetNumRows(int64_t num_rows) { metadata_.SetNumRows(num_rows); } |
626 | |
627 | Status Finalize() { |
628 | RETURN_NOT_OK(CheckStarted()); |
629 | RETURN_NOT_OK(metadata_.Finish()); |
630 | |
631 | auto buffer = metadata_.GetBuffer(); |
632 | |
633 | // Writer metadata |
634 | int64_t bytes_written; |
635 | RETURN_NOT_OK( |
636 | WritePadded(stream_.get(), buffer->data(), buffer->size(), &bytes_written)); |
637 | uint32_t buffer_size = static_cast<uint32_t>(bytes_written); |
638 | |
639 | // Footer: metadata length, magic bytes |
640 | RETURN_NOT_OK(stream_->Write(&buffer_size, sizeof(uint32_t))); |
641 | return stream_->Write(kFeatherMagicBytes, strlen(kFeatherMagicBytes)); |
642 | } |
643 | |
644 | Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) { |
645 | if (!(is_primitive(values.type_id()) || is_binary_like(values.type_id()))) { |
646 | return Status::Invalid("Array is not primitive type: " , values.type()->ToString()); |
647 | } |
648 | |
649 | meta->type = ToFlatbufferType(values.type_id()); |
650 | |
651 | RETURN_NOT_OK(stream_->Tell(&meta->offset)); |
652 | |
653 | meta->length = values.length(); |
654 | meta->null_count = values.null_count(); |
655 | meta->total_bytes = 0; |
656 | |
657 | return Status::OK(); |
658 | } |
659 | |
660 | Status WriteArray(const Array& values, ArrayMetadata* meta) { |
661 | RETURN_NOT_OK(CheckStarted()); |
662 | RETURN_NOT_OK(LoadArrayMetadata(values, meta)); |
663 | |
664 | int64_t bytes_written; |
665 | |
666 | // Write the null bitmask |
667 | if (values.null_count() > 0) { |
668 | // We assume there is one bit for each value in values.nulls, |
669 | // starting at the zero offset. |
670 | int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(values.length())); |
671 | if (values.null_bitmap()) { |
672 | auto null_bitmap = values.null_bitmap(); |
673 | RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), null_bitmap->data(), |
674 | values.offset(), null_bitmap_size, |
675 | &bytes_written)); |
676 | } else { |
677 | RETURN_NOT_OK(WritePaddedBlank(stream_.get(), null_bitmap_size, &bytes_written)); |
678 | } |
679 | meta->total_bytes += bytes_written; |
680 | } |
681 | |
682 | int64_t values_bytes = 0; |
683 | int64_t bit_offset = 0; |
684 | |
685 | const uint8_t* values_buffer = nullptr; |
686 | |
687 | if (is_binary_like(values.type_id())) { |
688 | const auto& bin_values = checked_cast<const BinaryArray&>(values); |
689 | |
690 | int64_t offset_bytes = sizeof(int32_t) * (values.length() + 1); |
691 | |
692 | if (bin_values.value_offsets()) { |
693 | values_bytes = bin_values.raw_value_offsets()[values.length()]; |
694 | |
695 | // Write the variable-length offsets |
696 | RETURN_NOT_OK( |
697 | WritePadded(stream_.get(), |
698 | reinterpret_cast<const uint8_t*>(bin_values.raw_value_offsets()), |
699 | offset_bytes, &bytes_written)); |
700 | } else { |
701 | RETURN_NOT_OK(WritePaddedBlank(stream_.get(), offset_bytes, &bytes_written)); |
702 | } |
703 | meta->total_bytes += bytes_written; |
704 | |
705 | if (bin_values.value_data()) { |
706 | values_buffer = bin_values.value_data()->data(); |
707 | } |
708 | } else { |
709 | const auto& prim_values = checked_cast<const PrimitiveArray&>(values); |
710 | const auto& fw_type = checked_cast<const FixedWidthType&>(*values.type()); |
711 | |
712 | values_bytes = BitUtil::BytesForBits(values.length() * fw_type.bit_width()); |
713 | |
714 | if (prim_values.values()) { |
715 | values_buffer = prim_values.values()->data() + |
716 | (prim_values.offset() * fw_type.bit_width() / 8); |
717 | bit_offset = (prim_values.offset() * fw_type.bit_width()) % 8; |
718 | } |
719 | } |
720 | if (values_buffer) { |
721 | RETURN_NOT_OK(WritePaddedWithOffset(stream_.get(), values_buffer, bit_offset, |
722 | values_bytes, &bytes_written)); |
723 | } else { |
724 | RETURN_NOT_OK(WritePaddedBlank(stream_.get(), values_bytes, &bytes_written)); |
725 | } |
726 | meta->total_bytes += bytes_written; |
727 | |
728 | return Status::OK(); |
729 | } |
730 | |
731 | Status WritePrimitiveValues(const Array& values) { |
732 | // Prepare metadata payload |
733 | ArrayMetadata meta; |
734 | RETURN_NOT_OK(WriteArray(values, &meta)); |
735 | current_column_->SetValues(meta); |
736 | return Status::OK(); |
737 | } |
738 | |
739 | Status Visit(const NullArray& values) override { |
740 | std::shared_ptr<Array> sanitized_nulls; |
741 | RETURN_NOT_OK(SanitizeUnsupportedTypes(values, &sanitized_nulls)); |
742 | return WritePrimitiveValues(*sanitized_nulls); |
743 | } |
744 | |
745 | #define VISIT_PRIMITIVE(TYPE) \ |
746 | Status Visit(const TYPE& values) override { return WritePrimitiveValues(values); } |
747 | |
748 | VISIT_PRIMITIVE(BooleanArray) |
749 | VISIT_PRIMITIVE(Int8Array) |
750 | VISIT_PRIMITIVE(Int16Array) |
751 | VISIT_PRIMITIVE(Int32Array) |
752 | VISIT_PRIMITIVE(Int64Array) |
753 | VISIT_PRIMITIVE(UInt8Array) |
754 | VISIT_PRIMITIVE(UInt16Array) |
755 | VISIT_PRIMITIVE(UInt32Array) |
756 | VISIT_PRIMITIVE(UInt64Array) |
757 | VISIT_PRIMITIVE(FloatArray) |
758 | VISIT_PRIMITIVE(DoubleArray) |
759 | VISIT_PRIMITIVE(BinaryArray) |
760 | VISIT_PRIMITIVE(StringArray) |
761 | |
762 | #undef VISIT_PRIMITIVE |
763 | |
764 | Status Visit(const DictionaryArray& values) override { |
765 | const auto& dict_type = checked_cast<const DictionaryType&>(*values.type()); |
766 | |
767 | if (!is_integer(values.indices()->type_id())) { |
768 | return Status::Invalid("Category values must be integers" ); |
769 | } |
770 | |
771 | RETURN_NOT_OK(WritePrimitiveValues(*values.indices())); |
772 | |
773 | ArrayMetadata levels_meta; |
774 | std::shared_ptr<Array> sanitized_dictionary; |
775 | RETURN_NOT_OK( |
776 | SanitizeUnsupportedTypes(*dict_type.dictionary(), &sanitized_dictionary)); |
777 | RETURN_NOT_OK(WriteArray(*sanitized_dictionary, &levels_meta)); |
778 | current_column_->SetCategory(levels_meta, dict_type.ordered()); |
779 | return Status::OK(); |
780 | } |
781 | |
782 | Status Visit(const TimestampArray& values) override { |
783 | RETURN_NOT_OK(WritePrimitiveValues(values)); |
784 | const auto& ts_type = checked_cast<const TimestampType&>(*values.type()); |
785 | current_column_->SetTimestamp(ts_type.unit(), ts_type.timezone()); |
786 | return Status::OK(); |
787 | } |
788 | |
789 | Status Visit(const Date32Array& values) override { |
790 | RETURN_NOT_OK(WritePrimitiveValues(values)); |
791 | current_column_->SetDate(); |
792 | return Status::OK(); |
793 | } |
794 | |
795 | Status Visit(const Time32Array& values) override { |
796 | RETURN_NOT_OK(WritePrimitiveValues(values)); |
797 | auto unit = checked_cast<const Time32Type&>(*values.type()).unit(); |
798 | current_column_->SetTime(unit); |
799 | return Status::OK(); |
800 | } |
801 | |
802 | Status Visit(const Time64Array& values) override { |
803 | return Status::NotImplemented("time64" ); |
804 | } |
805 | |
806 | Status Append(const std::string& name, const Array& values) { |
807 | current_column_ = metadata_.AddColumn(name); |
808 | RETURN_NOT_OK(values.Accept(this)); |
809 | return current_column_->Finish(); |
810 | } |
811 | |
812 | Status Write(const Table& table) { |
813 | for (int i = 0; i < table.num_columns(); ++i) { |
814 | auto column = table.column(i); |
815 | current_column_ = metadata_.AddColumn(column->name()); |
816 | auto chunked_array = column->data(); |
817 | for (const auto chunk : chunked_array->chunks()) { |
818 | RETURN_NOT_OK(chunk->Accept(this)); |
819 | } |
820 | RETURN_NOT_OK(current_column_->Finish()); |
821 | } |
822 | return Status::OK(); |
823 | } |
824 | |
825 | private: |
826 | Status CheckStarted() { |
827 | if (!initialized_stream_) { |
828 | int64_t bytes_written_unused; |
829 | RETURN_NOT_OK(WritePadded(stream_.get(), |
830 | reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), |
831 | strlen(kFeatherMagicBytes), &bytes_written_unused)); |
832 | initialized_stream_ = true; |
833 | } |
834 | return Status::OK(); |
835 | } |
836 | |
837 | std::shared_ptr<io::OutputStream> stream_; |
838 | |
839 | bool initialized_stream_; |
840 | TableBuilder metadata_; |
841 | |
842 | std::unique_ptr<ColumnBuilder> current_column_; |
843 | |
844 | Status AppendPrimitive(const PrimitiveArray& values, ArrayMetadata* out); |
845 | }; |
846 | |
847 | TableWriter::TableWriter() { impl_.reset(new TableWriterImpl()); } |
848 | |
849 | TableWriter::~TableWriter() {} |
850 | |
851 | Status TableWriter::Open(const std::shared_ptr<io::OutputStream>& stream, |
852 | std::unique_ptr<TableWriter>* out) { |
853 | out->reset(new TableWriter()); |
854 | return (*out)->impl_->Open(stream); |
855 | } |
856 | |
857 | void TableWriter::SetDescription(const std::string& desc) { impl_->SetDescription(desc); } |
858 | |
859 | void TableWriter::SetNumRows(int64_t num_rows) { impl_->SetNumRows(num_rows); } |
860 | |
861 | Status TableWriter::Append(const std::string& name, const Array& values) { |
862 | return impl_->Append(name, values); |
863 | } |
864 | |
865 | Status TableWriter::Write(const Table& table) { return impl_->Write(table); } |
866 | |
867 | Status TableWriter::Finalize() { return impl_->Finalize(); } |
868 | |
869 | } // namespace feather |
870 | } // namespace ipc |
871 | } // namespace arrow |
872 | |