1/* -*- c-basic-offset: 2 -*- */
2/*
3 Copyright(C) 2017 Brazil
4
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License version 2.1 as published by the Free Software Foundation.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#include "grn.h"
20#include "grn_db.h"
21
22#ifdef GRN_WITH_ARROW
23#include <groonga/arrow.hpp>
24
25#include <arrow/api.h>
26#include <arrow/io/file.h>
27#include <arrow/ipc/api.h>
28
29#include <sstream>
30
31namespace grnarrow {
32 grn_rc status_to_rc(arrow::Status &status) {
33 switch (status.code()) {
34 case arrow::StatusCode::OK:
35 return GRN_SUCCESS;
36 case arrow::StatusCode::OutOfMemory:
37 return GRN_NO_MEMORY_AVAILABLE;
38 case arrow::StatusCode::KeyError:
39 return GRN_INVALID_ARGUMENT; // TODO
40 case arrow::StatusCode::TypeError:
41 return GRN_INVALID_ARGUMENT; // TODO
42 case arrow::StatusCode::Invalid:
43 return GRN_INVALID_ARGUMENT;
44 case arrow::StatusCode::IOError:
45 return GRN_INPUT_OUTPUT_ERROR;
46 case arrow::StatusCode::UnknownError:
47 return GRN_UNKNOWN_ERROR;
48 case arrow::StatusCode::NotImplemented:
49 return GRN_FUNCTION_NOT_IMPLEMENTED;
50 default:
51 return GRN_UNKNOWN_ERROR;
52 }
53 }
54
55 grn_bool check_status(grn_ctx *ctx,
56 arrow::Status &status,
57 const char *context) {
58 if (status.ok()) {
59 return GRN_TRUE;
60 } else {
61 auto rc = status_to_rc(status);
62 auto message = status.ToString();
63 ERR(rc, "%s: %s", context, message.c_str());
64 return GRN_FALSE;
65 }
66 }
67
68 grn_bool check_status(grn_ctx *ctx,
69 arrow::Status &status,
70 std::ostream &output) {
71 return check_status(ctx,
72 status,
73 static_cast<std::stringstream &>(output).str().c_str());
74 }
75
76 class ColumnLoadVisitor : public arrow::ArrayVisitor {
77 public:
78 ColumnLoadVisitor(grn_ctx *ctx,
79 grn_obj *grn_table,
80 std::shared_ptr<arrow::Column> &arrow_column,
81 const grn_id *ids)
82 : ctx_(ctx),
83 grn_table_(grn_table),
84 ids_(ids),
85 time_unit_(arrow::TimeUnit::SECOND) {
86 auto column_name = arrow_column->name();
87 grn_column_ = grn_obj_column(ctx_, grn_table_,
88 column_name.data(),
89 column_name.size());
90
91 auto arrow_type = arrow_column->type();
92 grn_id type_id;
93 switch (arrow_type->id()) {
94 case arrow::Type::BOOL :
95 type_id = GRN_DB_BOOL;
96 break;
97 case arrow::Type::UINT8 :
98 type_id = GRN_DB_UINT8;
99 break;
100 case arrow::Type::INT8 :
101 type_id = GRN_DB_INT8;
102 break;
103 case arrow::Type::UINT16 :
104 type_id = GRN_DB_UINT16;
105 break;
106 case arrow::Type::INT16 :
107 type_id = GRN_DB_INT16;
108 break;
109 case arrow::Type::UINT32 :
110 type_id = GRN_DB_UINT32;
111 break;
112 case arrow::Type::INT32 :
113 type_id = GRN_DB_INT32;
114 break;
115 case arrow::Type::UINT64 :
116 type_id = GRN_DB_UINT64;
117 break;
118 case arrow::Type::INT64 :
119 type_id = GRN_DB_INT64;
120 break;
121 case arrow::Type::HALF_FLOAT :
122 case arrow::Type::FLOAT :
123 case arrow::Type::DOUBLE :
124 type_id = GRN_DB_FLOAT;
125 break;
126 case arrow::Type::STRING :
127 type_id = GRN_DB_TEXT;
128 break;
129 case arrow::Type::DATE64 :
130 type_id = GRN_DB_TIME;
131 break;
132 case arrow::Type::TIMESTAMP :
133 type_id = GRN_DB_TIME;
134 {
135 auto arrow_timestamp_type =
136 std::static_pointer_cast<arrow::TimestampType>(arrow_type);
137 time_unit_ = arrow_timestamp_type->unit();
138 }
139 break;
140 default :
141 type_id = GRN_DB_VOID;
142 break;
143 }
144
145 if (type_id == GRN_DB_VOID) {
146 // TODO
147 return;
148 }
149
150 if (!grn_column_) {
151 grn_column_ = grn_column_create(ctx_,
152 grn_table_,
153 column_name.data(),
154 column_name.size(),
155 NULL,
156 GRN_OBJ_COLUMN_SCALAR,
157 grn_ctx_at(ctx_, type_id));
158 }
159 if (type_id == GRN_DB_TEXT) {
160 GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY);
161 } else {
162 GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id);
163 }
164 }
165
166 ~ColumnLoadVisitor() {
167 if (grn_obj_is_accessor(ctx_, grn_column_)) {
168 grn_obj_unlink(ctx_, grn_column_);
169 }
170 GRN_OBJ_FIN(ctx_, &buffer_);
171 }
172
173 arrow::Status Visit(const arrow::BooleanArray &array) {
174 return set_values(array);
175 }
176
177 arrow::Status Visit(const arrow::Int8Array &array) {
178 return set_values(array);
179 }
180
181 arrow::Status Visit(const arrow::UInt8Array &array) {
182 return set_values(array);
183 }
184
185 arrow::Status Visit(const arrow::Int16Array &array) {
186 return set_values(array);
187 }
188
189 arrow::Status Visit(const arrow::UInt16Array &array) {
190 return set_values(array);
191 }
192
193 arrow::Status Visit(const arrow::Int32Array &array) {
194 return set_values(array);
195 }
196
197 arrow::Status Visit(const arrow::UInt32Array &array) {
198 return set_values(array);
199 }
200
201 arrow::Status Visit(const arrow::Int64Array &array) {
202 return set_values(array);
203 }
204
205 arrow::Status Visit(const arrow::UInt64Array &array) {
206 return set_values(array);
207 }
208
209 arrow::Status Visit(const arrow::HalfFloatArray &array) {
210 return set_values(array);
211 }
212
213 arrow::Status Visit(const arrow::FloatArray &array) {
214 return set_values(array);
215 }
216
217 arrow::Status Visit(const arrow::DoubleArray &array) {
218 return set_values(array);
219 }
220
221 arrow::Status Visit(const arrow::StringArray &array) {
222 return set_values(array);
223 }
224
225 arrow::Status Visit(const arrow::Date64Array &array) {
226 return set_values(array);
227 }
228
229 arrow::Status Visit(const arrow::TimestampArray &array) {
230 return set_values(array);
231 }
232
233 private:
234 grn_ctx *ctx_;
235 grn_obj *grn_table_;
236 const grn_id *ids_;
237 arrow::TimeUnit::type time_unit_;
238 grn_obj *grn_column_;
239 grn_obj buffer_;
240
241 template <typename T>
242 arrow::Status set_values(const T &array) {
243 int64_t n_rows = array.length();
244 for (int i = 0; i < n_rows; ++i) {
245 auto id = ids_[i];
246 GRN_BULK_REWIND(&buffer_);
247 get_value(array, i);
248 grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
249 }
250 return arrow::Status::OK();
251 }
252
253 void
254 get_value(const arrow::BooleanArray &array, int i) {
255 GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
256 }
257
258 void
259 get_value(const arrow::UInt8Array &array, int i) {
260 GRN_UINT8_SET(ctx_, &buffer_, array.Value(i));
261 }
262
263 void
264 get_value(const arrow::Int8Array &array, int i) {
265 GRN_INT8_SET(ctx_, &buffer_, array.Value(i));
266 }
267
268 void
269 get_value(const arrow::UInt16Array &array, int i) {
270 GRN_UINT16_SET(ctx_, &buffer_, array.Value(i));
271 }
272
273 void
274 get_value(const arrow::Int16Array &array, int i) {
275 GRN_INT16_SET(ctx_, &buffer_, array.Value(i));
276 }
277
278 void
279 get_value(const arrow::UInt32Array &array, int i) {
280 GRN_UINT32_SET(ctx_, &buffer_, array.Value(i));
281 }
282
283 void
284 get_value(const arrow::Int32Array &array, int i) {
285 GRN_INT32_SET(ctx_, &buffer_, array.Value(i));
286 }
287
288 void
289 get_value(const arrow::UInt64Array &array, int i) {
290 GRN_UINT64_SET(ctx_, &buffer_, array.Value(i));
291 }
292
293 void
294 get_value(const arrow::Int64Array &array, int i) {
295 GRN_INT64_SET(ctx_, &buffer_, array.Value(i));
296 }
297
298 void
299 get_value(const arrow::HalfFloatArray &array, int i) {
300 GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
301 }
302
303 void
304 get_value(const arrow::FloatArray &array, int i) {
305 GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
306 }
307
308 void
309 get_value(const arrow::DoubleArray &array, int i) {
310 GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
311 }
312
313 void
314 get_value(const arrow::StringArray &array, int i) {
315 int32_t size;
316 const auto data = array.GetValue(i, &size);
317 GRN_TEXT_SET(ctx_, &buffer_, data, size);
318 }
319
320 void
321 get_value(const arrow::Date64Array &array, int i) {
322 GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
323 }
324
325 void
326 get_value(const arrow::TimestampArray &array, int i) {
327 switch (time_unit_) {
328 case arrow::TimeUnit::SECOND :
329 GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0));
330 break;
331 case arrow::TimeUnit::MILLI :
332 GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000);
333 break;
334 case arrow::TimeUnit::MICRO :
335 GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
336 break;
337 case arrow::TimeUnit::NANO :
338 GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000);
339 break;
340 }
341 }
342 };
343
344 class FileLoader {
345 public:
346 FileLoader(grn_ctx *ctx, grn_obj *grn_table)
347 : ctx_(ctx),
348 grn_table_(grn_table),
349 key_column_name_("") {
350 }
351
352 ~FileLoader() {
353 }
354
355 grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) {
356 int n_columns = arrow_table->num_columns();
357
358 if (key_column_name_.empty()) {
359 grn_obj ids;
360 GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_));
361 auto n_records = arrow_table->num_rows();
362 for (int64_t i = 0; i < n_records; ++i) {
363 auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL);
364 GRN_RECORD_PUT(ctx_, &ids, id);
365 }
366 for (int i = 0; i < n_columns; ++i) {
367 int64_t offset = 0;
368 auto arrow_column = arrow_table->column(i);
369 auto arrow_chunked_data = arrow_column->data();
370 for (auto arrow_array : arrow_chunked_data->chunks()) {
371 grn_id *sub_ids =
372 reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
373 ColumnLoadVisitor visitor(ctx_,
374 grn_table_,
375 arrow_column,
376 sub_ids);
377 arrow_array->Accept(&visitor);
378 offset += arrow_array->length();
379 }
380 }
381 GRN_OBJ_FIN(ctx_, &ids);
382 } else {
383 auto status = arrow::Status::NotImplemented("_key isn't supported yet");
384 check_status(ctx_, status, "[arrow][load]");
385 }
386 return ctx_->rc;
387 };
388
389 grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
390 std::shared_ptr<arrow::Table> arrow_table;
391 std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
392 arrow_record_batches[0] = arrow_record_batch;
393 auto status =
394 arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
395 if (!check_status(ctx_,
396 status,
397 "[arrow][load] "
398 "failed to convert record batch to table")) {
399 return ctx_->rc;
400 }
401 return load_table(arrow_table);
402 };
403
404 private:
405 grn_ctx *ctx_;
406 grn_obj *grn_table_;
407 std::string key_column_name_;
408 };
409
410 class FileDumper {
411 public:
412 FileDumper(grn_ctx *ctx, grn_obj *grn_table, grn_obj *grn_columns)
413 : ctx_(ctx),
414 grn_table_(grn_table),
415 grn_columns_(grn_columns) {
416 }
417
418 ~FileDumper() {
419 }
420
421 grn_rc dump(arrow::io::OutputStream *output) {
422 std::vector<std::shared_ptr<arrow::Field>> fields;
423 auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
424 for (auto i = 0; i < n_columns; ++i) {
425 auto column = GRN_PTR_VALUE_AT(grn_columns_, i);
426
427 char column_name[GRN_TABLE_MAX_KEY_SIZE];
428 int column_name_size;
429 column_name_size =
430 grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE);
431 std::string field_name(column_name, column_name_size);
432 std::shared_ptr<arrow::DataType> field_type;
433 switch (grn_obj_get_range(ctx_, column)) {
434 case GRN_DB_BOOL :
435 field_type = arrow::boolean();
436 break;
437 case GRN_DB_UINT8 :
438 field_type = arrow::uint8();
439 break;
440 case GRN_DB_INT8 :
441 field_type = arrow::int8();
442 break;
443 case GRN_DB_UINT16 :
444 field_type = arrow::uint16();
445 break;
446 case GRN_DB_INT16 :
447 field_type = arrow::int16();
448 break;
449 case GRN_DB_UINT32 :
450 field_type = arrow::uint32();
451 break;
452 case GRN_DB_INT32 :
453 field_type = arrow::int32();
454 break;
455 case GRN_DB_UINT64 :
456 field_type = arrow::uint64();
457 break;
458 case GRN_DB_INT64 :
459 field_type = arrow::int64();
460 break;
461 case GRN_DB_FLOAT :
462 field_type = arrow::float64();
463 break;
464 case GRN_DB_TIME :
465 field_type =
466 std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
467 break;
468 case GRN_DB_SHORT_TEXT :
469 case GRN_DB_TEXT :
470 case GRN_DB_LONG_TEXT :
471 field_type = arrow::utf8();
472 break;
473 default :
474 break;
475 }
476 if (!field_type) {
477 continue;
478 }
479
480 auto field = std::make_shared<arrow::Field>(field_name,
481 field_type,
482 false);
483 fields.push_back(field);
484 };
485
486 auto schema = std::make_shared<arrow::Schema>(fields);
487
488 std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer;
489 auto status =
490 arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer);
491 if (!check_status(ctx_,
492 status,
493 "[arrow][dump] failed to create file format writer")) {
494 return ctx_->rc;
495 }
496
497 std::vector<grn_id> ids;
498 int n_records_per_batch = 1000;
499 GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) {
500 ids.push_back(record_id);
501 if (ids.size() == n_records_per_batch) {
502 write_record_batch(ids, schema, writer);
503 ids.clear();
504 }
505 } GRN_TABLE_EACH_END(ctx_, table_cursor);
506 if (!ids.empty()) {
507 write_record_batch(ids, schema, writer);
508 }
509 writer->Close();
510
511 return ctx_->rc;
512 }
513
514 private:
515 grn_ctx *ctx_;
516 grn_obj *grn_table_;
517 grn_obj *grn_columns_;
518
519 void write_record_batch(std::vector<grn_id> &ids,
520 std::shared_ptr<arrow::Schema> &schema,
521 std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) {
522 std::vector<std::shared_ptr<arrow::Array>> columns;
523 auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
524 for (auto i = 0; i < n_columns; ++i) {
525 auto grn_column = GRN_PTR_VALUE_AT(grn_columns_, i);
526
527 arrow::Status status;
528 std::shared_ptr<arrow::Array> column;
529
530 switch (grn_obj_get_range(ctx_, grn_column)) {
531 case GRN_DB_BOOL :
532 status = build_boolean_array(ids, grn_column, &column);
533 break;
534 case GRN_DB_UINT8 :
535 status = build_uint8_array(ids, grn_column, &column);
536 break;
537 case GRN_DB_INT8 :
538 status = build_int8_array(ids, grn_column, &column);
539 break;
540 case GRN_DB_UINT16 :
541 status = build_uint16_array(ids, grn_column, &column);
542 break;
543 case GRN_DB_INT16 :
544 status = build_int16_array(ids, grn_column, &column);
545 break;
546 case GRN_DB_UINT32 :
547 status = build_uint32_array(ids, grn_column, &column);
548 break;
549 case GRN_DB_INT32 :
550 status = build_int32_array(ids, grn_column, &column);
551 break;
552 case GRN_DB_UINT64 :
553 status = build_uint64_array(ids, grn_column, &column);
554 break;
555 case GRN_DB_INT64 :
556 status = build_int64_array(ids, grn_column, &column);
557 break;
558 case GRN_DB_FLOAT :
559 status = build_double_array(ids, grn_column, &column);
560 break;
561 case GRN_DB_TIME :
562 status = build_timestamp_array(ids, grn_column, &column);
563 break;
564 case GRN_DB_SHORT_TEXT :
565 case GRN_DB_TEXT :
566 case GRN_DB_LONG_TEXT :
567 status = build_utf8_array(ids, grn_column, &column);
568 break;
569 default :
570 status =
571 arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO");
572 break;
573 }
574 if (!status.ok()) {
575 continue;
576 }
577 columns.push_back(column);
578 }
579
580 arrow::RecordBatch record_batch(schema, ids.size(), columns);
581 writer->WriteRecordBatch(record_batch);
582 }
583
584 arrow::Status build_boolean_array(std::vector<grn_id> &ids,
585 grn_obj *grn_column,
586 std::shared_ptr<arrow::Array> *array) {
587 arrow::BooleanBuilder builder(arrow::default_memory_pool());
588 for (auto id : ids) {
589 uint32_t size;
590 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
591 builder.Append(*(reinterpret_cast<const grn_bool *>(data)));
592 }
593 return builder.Finish(array);
594 }
595
596 arrow::Status build_uint8_array(std::vector<grn_id> &ids,
597 grn_obj *grn_column,
598 std::shared_ptr<arrow::Array> *array) {
599 arrow::UInt8Builder builder(arrow::default_memory_pool());
600 for (auto id : ids) {
601 uint32_t size;
602 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
603 builder.Append(*(reinterpret_cast<const uint8_t *>(data)));
604 }
605 return builder.Finish(array);
606 }
607
608 arrow::Status build_int8_array(std::vector<grn_id> &ids,
609 grn_obj *grn_column,
610 std::shared_ptr<arrow::Array> *array) {
611 arrow::Int8Builder builder(arrow::default_memory_pool());
612 for (auto id : ids) {
613 uint32_t size;
614 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
615 builder.Append(*(reinterpret_cast<const int8_t *>(data)));
616 }
617 return builder.Finish(array);
618 }
619
620 arrow::Status build_uint16_array(std::vector<grn_id> &ids,
621 grn_obj *grn_column,
622 std::shared_ptr<arrow::Array> *array) {
623 arrow::UInt16Builder builder(arrow::default_memory_pool());
624 for (auto id : ids) {
625 uint32_t size;
626 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
627 builder.Append(*(reinterpret_cast<const uint16_t *>(data)));
628 }
629 return builder.Finish(array);
630 }
631
632 arrow::Status build_int16_array(std::vector<grn_id> &ids,
633 grn_obj *grn_column,
634 std::shared_ptr<arrow::Array> *array) {
635 arrow::Int16Builder builder(arrow::default_memory_pool());
636 for (auto id : ids) {
637 uint32_t size;
638 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
639 builder.Append(*(reinterpret_cast<const int16_t *>(data)));
640 }
641 return builder.Finish(array);
642 }
643
644 arrow::Status build_uint32_array(std::vector<grn_id> &ids,
645 grn_obj *grn_column,
646 std::shared_ptr<arrow::Array> *array) {
647 arrow::UInt32Builder builder(arrow::default_memory_pool());
648 for (auto id : ids) {
649 uint32_t size;
650 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
651 builder.Append(*(reinterpret_cast<const uint32_t *>(data)));
652 }
653 return builder.Finish(array);
654 }
655
656 arrow::Status build_int32_array(std::vector<grn_id> &ids,
657 grn_obj *grn_column,
658 std::shared_ptr<arrow::Array> *array) {
659 arrow::Int32Builder builder(arrow::default_memory_pool());
660 for (auto id : ids) {
661 uint32_t size;
662 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
663 builder.Append(*(reinterpret_cast<const int32_t *>(data)));
664 }
665 return builder.Finish(array);
666 }
667 arrow::Status build_uint64_array(std::vector<grn_id> &ids,
668 grn_obj *grn_column,
669 std::shared_ptr<arrow::Array> *array) {
670 arrow::UInt64Builder builder(arrow::default_memory_pool());
671 for (auto id : ids) {
672 uint32_t size;
673 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
674 builder.Append(*(reinterpret_cast<const uint64_t *>(data)));
675 }
676 return builder.Finish(array);
677 }
678
679 arrow::Status build_int64_array(std::vector<grn_id> &ids,
680 grn_obj *grn_column,
681 std::shared_ptr<arrow::Array> *array) {
682 arrow::Int64Builder builder(arrow::default_memory_pool());
683 for (auto id : ids) {
684 uint32_t size;
685 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
686 builder.Append(*(reinterpret_cast<const int64_t *>(data)));
687 }
688 return builder.Finish(array);
689 }
690
691 arrow::Status build_double_array(std::vector<grn_id> &ids,
692 grn_obj *grn_column,
693 std::shared_ptr<arrow::Array> *array) {
694 arrow::DoubleBuilder builder(arrow::default_memory_pool());
695 for (auto id : ids) {
696 uint32_t size;
697 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
698 builder.Append(*(reinterpret_cast<const double *>(data)));
699 }
700 return builder.Finish(array);
701 }
702
703 arrow::Status build_timestamp_array(std::vector<grn_id> &ids,
704 grn_obj *grn_column,
705 std::shared_ptr<arrow::Array> *array) {
706 auto timestamp_ns_data_type =
707 std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
708 arrow::TimestampBuilder builder(arrow::default_memory_pool(),
709 timestamp_ns_data_type);
710 for (auto id : ids) {
711 uint32_t size;
712 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
713 auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data));
714 builder.Append(timestamp_ns);
715 }
716 return builder.Finish(array);
717 }
718
719 arrow::Status build_utf8_array(std::vector<grn_id> &ids,
720 grn_obj *grn_column,
721 std::shared_ptr<arrow::Array> *array) {
722 arrow::StringBuilder builder(arrow::default_memory_pool());
723 for (auto id : ids) {
724 uint32_t size;
725 auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
726 builder.Append(data, size);
727 }
728 return builder.Finish(array);
729 }
730 };
731}
732#endif /* GRN_WITH_ARROW */
733
734extern "C" {
735grn_rc
736grn_arrow_load(grn_ctx *ctx,
737 grn_obj *table,
738 const char *path)
739{
740 GRN_API_ENTER;
741#ifdef GRN_WITH_ARROW
742 std::shared_ptr<arrow::io::MemoryMappedFile> input;
743 auto status =
744 arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
745 if (!grnarrow::check_status(ctx,
746 status,
747 std::ostringstream() <<
748 "[arrow][load] failed to open path: " <<
749 "<" << path << ">")) {
750 GRN_API_RETURN(ctx->rc);
751 }
752 std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
753 status = arrow::ipc::RecordBatchFileReader::Open(input, &reader);
754 if (!grnarrow::check_status(ctx,
755 status,
756 "[arrow][load] "
757 "failed to create file format reader")) {
758 GRN_API_RETURN(ctx->rc);
759 }
760
761 grnarrow::FileLoader loader(ctx, table);
762 int n_record_batches = reader->num_record_batches();
763 for (int i = 0; i < n_record_batches; ++i) {
764 std::shared_ptr<arrow::RecordBatch> record_batch;
765 status = reader->ReadRecordBatch(i, &record_batch);
766 if (!grnarrow::check_status(ctx,
767 status,
768 std::ostringstream("") <<
769 "[arrow][load] failed to get " <<
770 "the " << i << "-th " << "record")) {
771 break;
772 }
773 loader.load_record_batch(record_batch);
774 if (ctx->rc != GRN_SUCCESS) {
775 break;
776 }
777 }
778#else /* GRN_WITH_ARROW */
779 ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
780 "[arrow][load] Apache Arrow support isn't enabled");
781#endif /* GRN_WITH_ARROW */
782 GRN_API_RETURN(ctx->rc);
783}
784
785grn_rc
786grn_arrow_dump(grn_ctx *ctx,
787 grn_obj *table,
788 const char *path)
789{
790 GRN_API_ENTER;
791#ifdef GRN_WITH_ARROW
792 auto all_columns =
793 grn_hash_create(ctx,
794 NULL,
795 sizeof(grn_id),
796 0,
797 GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY);
798 grn_table_columns(ctx,
799 table,
800 "", 0,
801 reinterpret_cast<grn_obj *>(all_columns));
802
803 grn_obj columns;
804 GRN_PTR_INIT(&columns, GRN_OBJ_VECTOR, GRN_ID_NIL);
805 GRN_HASH_EACH_BEGIN(ctx, all_columns, cursor, id) {
806 void *key;
807 grn_hash_cursor_get_key(ctx, cursor, &key);
808 auto column_id = static_cast<grn_id *>(key);
809 auto column = grn_ctx_at(ctx, *column_id);
810 GRN_PTR_PUT(ctx, &columns, column);
811 } GRN_HASH_EACH_END(ctx, cursor);
812 grn_hash_close(ctx, all_columns);
813
814 grn_arrow_dump_columns(ctx, table, &columns, path);
815 GRN_OBJ_FIN(ctx, &columns);
816#else /* GRN_WITH_ARROW */
817 ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
818 "[arrow][dump] Apache Arrow support isn't enabled");
819#endif /* GRN_WITH_ARROW */
820 GRN_API_RETURN(ctx->rc);
821}
822
823grn_rc
824grn_arrow_dump_columns(grn_ctx *ctx,
825 grn_obj *table,
826 grn_obj *columns,
827 const char *path)
828{
829 GRN_API_ENTER;
830#ifdef GRN_WITH_ARROW
831 std::shared_ptr<arrow::io::FileOutputStream> output;
832 auto status = arrow::io::FileOutputStream::Open(path, &output);
833 if (!grnarrow::check_status(ctx,
834 status,
835 std::stringstream() <<
836 "[arrow][dump] failed to open path: " <<
837 "<" << path << ">")) {
838 GRN_API_RETURN(ctx->rc);
839 }
840
841 grnarrow::FileDumper dumper(ctx, table, columns);
842 dumper.dump(output.get());
843#else /* GRN_WITH_ARROW */
844 ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
845 "[arrow][dump] Apache Arrow support isn't enabled");
846#endif /* GRN_WITH_ARROW */
847 GRN_API_RETURN(ctx->rc);
848}
849}
850