1 | /* -*- c-basic-offset: 2 -*- */ |
2 | /* |
3 | Copyright(C) 2017 Brazil |
4 | |
5 | This library is free software; you can redistribute it and/or |
6 | modify it under the terms of the GNU Lesser General Public |
7 | License version 2.1 as published by the Free Software Foundation. |
8 | |
9 | This library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
12 | Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU Lesser General Public |
15 | License along with this library; if not, write to the Free Software |
16 | Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
17 | */ |
18 | |
19 | #include "grn.h" |
20 | #include "grn_db.h" |
21 | |
22 | #ifdef GRN_WITH_ARROW |
23 | #include <groonga/arrow.hpp> |
24 | |
25 | #include <arrow/api.h> |
26 | #include <arrow/io/file.h> |
27 | #include <arrow/ipc/api.h> |
28 | |
29 | #include <sstream> |
30 | |
31 | namespace grnarrow { |
32 | grn_rc status_to_rc(arrow::Status &status) { |
33 | switch (status.code()) { |
34 | case arrow::StatusCode::OK: |
35 | return GRN_SUCCESS; |
36 | case arrow::StatusCode::OutOfMemory: |
37 | return GRN_NO_MEMORY_AVAILABLE; |
38 | case arrow::StatusCode::KeyError: |
39 | return GRN_INVALID_ARGUMENT; // TODO |
40 | case arrow::StatusCode::TypeError: |
41 | return GRN_INVALID_ARGUMENT; // TODO |
42 | case arrow::StatusCode::Invalid: |
43 | return GRN_INVALID_ARGUMENT; |
44 | case arrow::StatusCode::IOError: |
45 | return GRN_INPUT_OUTPUT_ERROR; |
46 | case arrow::StatusCode::UnknownError: |
47 | return GRN_UNKNOWN_ERROR; |
48 | case arrow::StatusCode::NotImplemented: |
49 | return GRN_FUNCTION_NOT_IMPLEMENTED; |
50 | default: |
51 | return GRN_UNKNOWN_ERROR; |
52 | } |
53 | } |
54 | |
55 | grn_bool check_status(grn_ctx *ctx, |
56 | arrow::Status &status, |
57 | const char *context) { |
58 | if (status.ok()) { |
59 | return GRN_TRUE; |
60 | } else { |
61 | auto rc = status_to_rc(status); |
62 | auto message = status.ToString(); |
63 | ERR(rc, "%s: %s" , context, message.c_str()); |
64 | return GRN_FALSE; |
65 | } |
66 | } |
67 | |
68 | grn_bool check_status(grn_ctx *ctx, |
69 | arrow::Status &status, |
70 | std::ostream &output) { |
71 | return check_status(ctx, |
72 | status, |
73 | static_cast<std::stringstream &>(output).str().c_str()); |
74 | } |
75 | |
76 | class ColumnLoadVisitor : public arrow::ArrayVisitor { |
77 | public: |
78 | ColumnLoadVisitor(grn_ctx *ctx, |
79 | grn_obj *grn_table, |
80 | std::shared_ptr<arrow::Column> &arrow_column, |
81 | const grn_id *ids) |
82 | : ctx_(ctx), |
83 | grn_table_(grn_table), |
84 | ids_(ids), |
85 | time_unit_(arrow::TimeUnit::SECOND) { |
86 | auto column_name = arrow_column->name(); |
87 | grn_column_ = grn_obj_column(ctx_, grn_table_, |
88 | column_name.data(), |
89 | column_name.size()); |
90 | |
91 | auto arrow_type = arrow_column->type(); |
92 | grn_id type_id; |
93 | switch (arrow_type->id()) { |
94 | case arrow::Type::BOOL : |
95 | type_id = GRN_DB_BOOL; |
96 | break; |
97 | case arrow::Type::UINT8 : |
98 | type_id = GRN_DB_UINT8; |
99 | break; |
100 | case arrow::Type::INT8 : |
101 | type_id = GRN_DB_INT8; |
102 | break; |
103 | case arrow::Type::UINT16 : |
104 | type_id = GRN_DB_UINT16; |
105 | break; |
106 | case arrow::Type::INT16 : |
107 | type_id = GRN_DB_INT16; |
108 | break; |
109 | case arrow::Type::UINT32 : |
110 | type_id = GRN_DB_UINT32; |
111 | break; |
112 | case arrow::Type::INT32 : |
113 | type_id = GRN_DB_INT32; |
114 | break; |
115 | case arrow::Type::UINT64 : |
116 | type_id = GRN_DB_UINT64; |
117 | break; |
118 | case arrow::Type::INT64 : |
119 | type_id = GRN_DB_INT64; |
120 | break; |
121 | case arrow::Type::HALF_FLOAT : |
122 | case arrow::Type::FLOAT : |
123 | case arrow::Type::DOUBLE : |
124 | type_id = GRN_DB_FLOAT; |
125 | break; |
126 | case arrow::Type::STRING : |
127 | type_id = GRN_DB_TEXT; |
128 | break; |
129 | case arrow::Type::DATE64 : |
130 | type_id = GRN_DB_TIME; |
131 | break; |
132 | case arrow::Type::TIMESTAMP : |
133 | type_id = GRN_DB_TIME; |
134 | { |
135 | auto arrow_timestamp_type = |
136 | std::static_pointer_cast<arrow::TimestampType>(arrow_type); |
137 | time_unit_ = arrow_timestamp_type->unit(); |
138 | } |
139 | break; |
140 | default : |
141 | type_id = GRN_DB_VOID; |
142 | break; |
143 | } |
144 | |
145 | if (type_id == GRN_DB_VOID) { |
146 | // TODO |
147 | return; |
148 | } |
149 | |
150 | if (!grn_column_) { |
151 | grn_column_ = grn_column_create(ctx_, |
152 | grn_table_, |
153 | column_name.data(), |
154 | column_name.size(), |
155 | NULL, |
156 | GRN_OBJ_COLUMN_SCALAR, |
157 | grn_ctx_at(ctx_, type_id)); |
158 | } |
159 | if (type_id == GRN_DB_TEXT) { |
160 | GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY); |
161 | } else { |
162 | GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id); |
163 | } |
164 | } |
165 | |
166 | ~ColumnLoadVisitor() { |
167 | if (grn_obj_is_accessor(ctx_, grn_column_)) { |
168 | grn_obj_unlink(ctx_, grn_column_); |
169 | } |
170 | GRN_OBJ_FIN(ctx_, &buffer_); |
171 | } |
172 | |
173 | arrow::Status Visit(const arrow::BooleanArray &array) { |
174 | return set_values(array); |
175 | } |
176 | |
177 | arrow::Status Visit(const arrow::Int8Array &array) { |
178 | return set_values(array); |
179 | } |
180 | |
181 | arrow::Status Visit(const arrow::UInt8Array &array) { |
182 | return set_values(array); |
183 | } |
184 | |
185 | arrow::Status Visit(const arrow::Int16Array &array) { |
186 | return set_values(array); |
187 | } |
188 | |
189 | arrow::Status Visit(const arrow::UInt16Array &array) { |
190 | return set_values(array); |
191 | } |
192 | |
193 | arrow::Status Visit(const arrow::Int32Array &array) { |
194 | return set_values(array); |
195 | } |
196 | |
197 | arrow::Status Visit(const arrow::UInt32Array &array) { |
198 | return set_values(array); |
199 | } |
200 | |
201 | arrow::Status Visit(const arrow::Int64Array &array) { |
202 | return set_values(array); |
203 | } |
204 | |
205 | arrow::Status Visit(const arrow::UInt64Array &array) { |
206 | return set_values(array); |
207 | } |
208 | |
209 | arrow::Status Visit(const arrow::HalfFloatArray &array) { |
210 | return set_values(array); |
211 | } |
212 | |
213 | arrow::Status Visit(const arrow::FloatArray &array) { |
214 | return set_values(array); |
215 | } |
216 | |
217 | arrow::Status Visit(const arrow::DoubleArray &array) { |
218 | return set_values(array); |
219 | } |
220 | |
221 | arrow::Status Visit(const arrow::StringArray &array) { |
222 | return set_values(array); |
223 | } |
224 | |
225 | arrow::Status Visit(const arrow::Date64Array &array) { |
226 | return set_values(array); |
227 | } |
228 | |
229 | arrow::Status Visit(const arrow::TimestampArray &array) { |
230 | return set_values(array); |
231 | } |
232 | |
233 | private: |
234 | grn_ctx *ctx_; |
235 | grn_obj *grn_table_; |
236 | const grn_id *ids_; |
237 | arrow::TimeUnit::type time_unit_; |
238 | grn_obj *grn_column_; |
239 | grn_obj buffer_; |
240 | |
241 | template <typename T> |
242 | arrow::Status set_values(const T &array) { |
243 | int64_t n_rows = array.length(); |
244 | for (int i = 0; i < n_rows; ++i) { |
245 | auto id = ids_[i]; |
246 | GRN_BULK_REWIND(&buffer_); |
247 | get_value(array, i); |
248 | grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET); |
249 | } |
250 | return arrow::Status::OK(); |
251 | } |
252 | |
253 | void |
254 | get_value(const arrow::BooleanArray &array, int i) { |
255 | GRN_BOOL_SET(ctx_, &buffer_, array.Value(i)); |
256 | } |
257 | |
258 | void |
259 | get_value(const arrow::UInt8Array &array, int i) { |
260 | GRN_UINT8_SET(ctx_, &buffer_, array.Value(i)); |
261 | } |
262 | |
263 | void |
264 | get_value(const arrow::Int8Array &array, int i) { |
265 | GRN_INT8_SET(ctx_, &buffer_, array.Value(i)); |
266 | } |
267 | |
268 | void |
269 | get_value(const arrow::UInt16Array &array, int i) { |
270 | GRN_UINT16_SET(ctx_, &buffer_, array.Value(i)); |
271 | } |
272 | |
273 | void |
274 | get_value(const arrow::Int16Array &array, int i) { |
275 | GRN_INT16_SET(ctx_, &buffer_, array.Value(i)); |
276 | } |
277 | |
278 | void |
279 | get_value(const arrow::UInt32Array &array, int i) { |
280 | GRN_UINT32_SET(ctx_, &buffer_, array.Value(i)); |
281 | } |
282 | |
283 | void |
284 | get_value(const arrow::Int32Array &array, int i) { |
285 | GRN_INT32_SET(ctx_, &buffer_, array.Value(i)); |
286 | } |
287 | |
288 | void |
289 | get_value(const arrow::UInt64Array &array, int i) { |
290 | GRN_UINT64_SET(ctx_, &buffer_, array.Value(i)); |
291 | } |
292 | |
293 | void |
294 | get_value(const arrow::Int64Array &array, int i) { |
295 | GRN_INT64_SET(ctx_, &buffer_, array.Value(i)); |
296 | } |
297 | |
298 | void |
299 | get_value(const arrow::HalfFloatArray &array, int i) { |
300 | GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); |
301 | } |
302 | |
303 | void |
304 | get_value(const arrow::FloatArray &array, int i) { |
305 | GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); |
306 | } |
307 | |
308 | void |
309 | get_value(const arrow::DoubleArray &array, int i) { |
310 | GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); |
311 | } |
312 | |
313 | void |
314 | get_value(const arrow::StringArray &array, int i) { |
315 | int32_t size; |
316 | const auto data = array.GetValue(i, &size); |
317 | GRN_TEXT_SET(ctx_, &buffer_, data, size); |
318 | } |
319 | |
320 | void |
321 | get_value(const arrow::Date64Array &array, int i) { |
322 | GRN_TIME_SET(ctx_, &buffer_, array.Value(i)); |
323 | } |
324 | |
325 | void |
326 | get_value(const arrow::TimestampArray &array, int i) { |
327 | switch (time_unit_) { |
328 | case arrow::TimeUnit::SECOND : |
329 | GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0)); |
330 | break; |
331 | case arrow::TimeUnit::MILLI : |
332 | GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000); |
333 | break; |
334 | case arrow::TimeUnit::MICRO : |
335 | GRN_TIME_SET(ctx_, &buffer_, array.Value(i)); |
336 | break; |
337 | case arrow::TimeUnit::NANO : |
338 | GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000); |
339 | break; |
340 | } |
341 | } |
342 | }; |
343 | |
344 | class FileLoader { |
345 | public: |
346 | FileLoader(grn_ctx *ctx, grn_obj *grn_table) |
347 | : ctx_(ctx), |
348 | grn_table_(grn_table), |
349 | key_column_name_("" ) { |
350 | } |
351 | |
352 | ~FileLoader() { |
353 | } |
354 | |
355 | grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) { |
356 | int n_columns = arrow_table->num_columns(); |
357 | |
358 | if (key_column_name_.empty()) { |
359 | grn_obj ids; |
360 | GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_)); |
361 | auto n_records = arrow_table->num_rows(); |
362 | for (int64_t i = 0; i < n_records; ++i) { |
363 | auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL); |
364 | GRN_RECORD_PUT(ctx_, &ids, id); |
365 | } |
366 | for (int i = 0; i < n_columns; ++i) { |
367 | int64_t offset = 0; |
368 | auto arrow_column = arrow_table->column(i); |
369 | auto arrow_chunked_data = arrow_column->data(); |
370 | for (auto arrow_array : arrow_chunked_data->chunks()) { |
371 | grn_id *sub_ids = |
372 | reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset; |
373 | ColumnLoadVisitor visitor(ctx_, |
374 | grn_table_, |
375 | arrow_column, |
376 | sub_ids); |
377 | arrow_array->Accept(&visitor); |
378 | offset += arrow_array->length(); |
379 | } |
380 | } |
381 | GRN_OBJ_FIN(ctx_, &ids); |
382 | } else { |
383 | auto status = arrow::Status::NotImplemented("_key isn't supported yet" ); |
384 | check_status(ctx_, status, "[arrow][load]" ); |
385 | } |
386 | return ctx_->rc; |
387 | }; |
388 | |
389 | grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) { |
390 | std::shared_ptr<arrow::Table> arrow_table; |
391 | std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1); |
392 | arrow_record_batches[0] = arrow_record_batch; |
393 | auto status = |
394 | arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table); |
395 | if (!check_status(ctx_, |
396 | status, |
397 | "[arrow][load] " |
398 | "failed to convert record batch to table" )) { |
399 | return ctx_->rc; |
400 | } |
401 | return load_table(arrow_table); |
402 | }; |
403 | |
404 | private: |
405 | grn_ctx *ctx_; |
406 | grn_obj *grn_table_; |
407 | std::string key_column_name_; |
408 | }; |
409 | |
410 | class FileDumper { |
411 | public: |
412 | FileDumper(grn_ctx *ctx, grn_obj *grn_table, grn_obj *grn_columns) |
413 | : ctx_(ctx), |
414 | grn_table_(grn_table), |
415 | grn_columns_(grn_columns) { |
416 | } |
417 | |
418 | ~FileDumper() { |
419 | } |
420 | |
421 | grn_rc dump(arrow::io::OutputStream *output) { |
422 | std::vector<std::shared_ptr<arrow::Field>> fields; |
423 | auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *); |
424 | for (auto i = 0; i < n_columns; ++i) { |
425 | auto column = GRN_PTR_VALUE_AT(grn_columns_, i); |
426 | |
427 | char column_name[GRN_TABLE_MAX_KEY_SIZE]; |
428 | int column_name_size; |
429 | column_name_size = |
430 | grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE); |
431 | std::string field_name(column_name, column_name_size); |
432 | std::shared_ptr<arrow::DataType> field_type; |
433 | switch (grn_obj_get_range(ctx_, column)) { |
434 | case GRN_DB_BOOL : |
435 | field_type = arrow::boolean(); |
436 | break; |
437 | case GRN_DB_UINT8 : |
438 | field_type = arrow::uint8(); |
439 | break; |
440 | case GRN_DB_INT8 : |
441 | field_type = arrow::int8(); |
442 | break; |
443 | case GRN_DB_UINT16 : |
444 | field_type = arrow::uint16(); |
445 | break; |
446 | case GRN_DB_INT16 : |
447 | field_type = arrow::int16(); |
448 | break; |
449 | case GRN_DB_UINT32 : |
450 | field_type = arrow::uint32(); |
451 | break; |
452 | case GRN_DB_INT32 : |
453 | field_type = arrow::int32(); |
454 | break; |
455 | case GRN_DB_UINT64 : |
456 | field_type = arrow::uint64(); |
457 | break; |
458 | case GRN_DB_INT64 : |
459 | field_type = arrow::int64(); |
460 | break; |
461 | case GRN_DB_FLOAT : |
462 | field_type = arrow::float64(); |
463 | break; |
464 | case GRN_DB_TIME : |
465 | field_type = |
466 | std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO); |
467 | break; |
468 | case GRN_DB_SHORT_TEXT : |
469 | case GRN_DB_TEXT : |
470 | case GRN_DB_LONG_TEXT : |
471 | field_type = arrow::utf8(); |
472 | break; |
473 | default : |
474 | break; |
475 | } |
476 | if (!field_type) { |
477 | continue; |
478 | } |
479 | |
480 | auto field = std::make_shared<arrow::Field>(field_name, |
481 | field_type, |
482 | false); |
483 | fields.push_back(field); |
484 | }; |
485 | |
486 | auto schema = std::make_shared<arrow::Schema>(fields); |
487 | |
488 | std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer; |
489 | auto status = |
490 | arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer); |
491 | if (!check_status(ctx_, |
492 | status, |
493 | "[arrow][dump] failed to create file format writer" )) { |
494 | return ctx_->rc; |
495 | } |
496 | |
497 | std::vector<grn_id> ids; |
498 | int n_records_per_batch = 1000; |
499 | GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) { |
500 | ids.push_back(record_id); |
501 | if (ids.size() == n_records_per_batch) { |
502 | write_record_batch(ids, schema, writer); |
503 | ids.clear(); |
504 | } |
505 | } GRN_TABLE_EACH_END(ctx_, table_cursor); |
506 | if (!ids.empty()) { |
507 | write_record_batch(ids, schema, writer); |
508 | } |
509 | writer->Close(); |
510 | |
511 | return ctx_->rc; |
512 | } |
513 | |
514 | private: |
515 | grn_ctx *ctx_; |
516 | grn_obj *grn_table_; |
517 | grn_obj *grn_columns_; |
518 | |
519 | void write_record_batch(std::vector<grn_id> &ids, |
520 | std::shared_ptr<arrow::Schema> &schema, |
521 | std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) { |
522 | std::vector<std::shared_ptr<arrow::Array>> columns; |
523 | auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *); |
524 | for (auto i = 0; i < n_columns; ++i) { |
525 | auto grn_column = GRN_PTR_VALUE_AT(grn_columns_, i); |
526 | |
527 | arrow::Status status; |
528 | std::shared_ptr<arrow::Array> column; |
529 | |
530 | switch (grn_obj_get_range(ctx_, grn_column)) { |
531 | case GRN_DB_BOOL : |
532 | status = build_boolean_array(ids, grn_column, &column); |
533 | break; |
534 | case GRN_DB_UINT8 : |
535 | status = build_uint8_array(ids, grn_column, &column); |
536 | break; |
537 | case GRN_DB_INT8 : |
538 | status = build_int8_array(ids, grn_column, &column); |
539 | break; |
540 | case GRN_DB_UINT16 : |
541 | status = build_uint16_array(ids, grn_column, &column); |
542 | break; |
543 | case GRN_DB_INT16 : |
544 | status = build_int16_array(ids, grn_column, &column); |
545 | break; |
546 | case GRN_DB_UINT32 : |
547 | status = build_uint32_array(ids, grn_column, &column); |
548 | break; |
549 | case GRN_DB_INT32 : |
550 | status = build_int32_array(ids, grn_column, &column); |
551 | break; |
552 | case GRN_DB_UINT64 : |
553 | status = build_uint64_array(ids, grn_column, &column); |
554 | break; |
555 | case GRN_DB_INT64 : |
556 | status = build_int64_array(ids, grn_column, &column); |
557 | break; |
558 | case GRN_DB_FLOAT : |
559 | status = build_double_array(ids, grn_column, &column); |
560 | break; |
561 | case GRN_DB_TIME : |
562 | status = build_timestamp_array(ids, grn_column, &column); |
563 | break; |
564 | case GRN_DB_SHORT_TEXT : |
565 | case GRN_DB_TEXT : |
566 | case GRN_DB_LONG_TEXT : |
567 | status = build_utf8_array(ids, grn_column, &column); |
568 | break; |
569 | default : |
570 | status = |
571 | arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO" ); |
572 | break; |
573 | } |
574 | if (!status.ok()) { |
575 | continue; |
576 | } |
577 | columns.push_back(column); |
578 | } |
579 | |
580 | arrow::RecordBatch record_batch(schema, ids.size(), columns); |
581 | writer->WriteRecordBatch(record_batch); |
582 | } |
583 | |
584 | arrow::Status build_boolean_array(std::vector<grn_id> &ids, |
585 | grn_obj *grn_column, |
586 | std::shared_ptr<arrow::Array> *array) { |
587 | arrow::BooleanBuilder builder(arrow::default_memory_pool()); |
588 | for (auto id : ids) { |
589 | uint32_t size; |
590 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
591 | builder.Append(*(reinterpret_cast<const grn_bool *>(data))); |
592 | } |
593 | return builder.Finish(array); |
594 | } |
595 | |
596 | arrow::Status build_uint8_array(std::vector<grn_id> &ids, |
597 | grn_obj *grn_column, |
598 | std::shared_ptr<arrow::Array> *array) { |
599 | arrow::UInt8Builder builder(arrow::default_memory_pool()); |
600 | for (auto id : ids) { |
601 | uint32_t size; |
602 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
603 | builder.Append(*(reinterpret_cast<const uint8_t *>(data))); |
604 | } |
605 | return builder.Finish(array); |
606 | } |
607 | |
608 | arrow::Status build_int8_array(std::vector<grn_id> &ids, |
609 | grn_obj *grn_column, |
610 | std::shared_ptr<arrow::Array> *array) { |
611 | arrow::Int8Builder builder(arrow::default_memory_pool()); |
612 | for (auto id : ids) { |
613 | uint32_t size; |
614 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
615 | builder.Append(*(reinterpret_cast<const int8_t *>(data))); |
616 | } |
617 | return builder.Finish(array); |
618 | } |
619 | |
620 | arrow::Status build_uint16_array(std::vector<grn_id> &ids, |
621 | grn_obj *grn_column, |
622 | std::shared_ptr<arrow::Array> *array) { |
623 | arrow::UInt16Builder builder(arrow::default_memory_pool()); |
624 | for (auto id : ids) { |
625 | uint32_t size; |
626 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
627 | builder.Append(*(reinterpret_cast<const uint16_t *>(data))); |
628 | } |
629 | return builder.Finish(array); |
630 | } |
631 | |
632 | arrow::Status build_int16_array(std::vector<grn_id> &ids, |
633 | grn_obj *grn_column, |
634 | std::shared_ptr<arrow::Array> *array) { |
635 | arrow::Int16Builder builder(arrow::default_memory_pool()); |
636 | for (auto id : ids) { |
637 | uint32_t size; |
638 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
639 | builder.Append(*(reinterpret_cast<const int16_t *>(data))); |
640 | } |
641 | return builder.Finish(array); |
642 | } |
643 | |
644 | arrow::Status build_uint32_array(std::vector<grn_id> &ids, |
645 | grn_obj *grn_column, |
646 | std::shared_ptr<arrow::Array> *array) { |
647 | arrow::UInt32Builder builder(arrow::default_memory_pool()); |
648 | for (auto id : ids) { |
649 | uint32_t size; |
650 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
651 | builder.Append(*(reinterpret_cast<const uint32_t *>(data))); |
652 | } |
653 | return builder.Finish(array); |
654 | } |
655 | |
656 | arrow::Status build_int32_array(std::vector<grn_id> &ids, |
657 | grn_obj *grn_column, |
658 | std::shared_ptr<arrow::Array> *array) { |
659 | arrow::Int32Builder builder(arrow::default_memory_pool()); |
660 | for (auto id : ids) { |
661 | uint32_t size; |
662 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
663 | builder.Append(*(reinterpret_cast<const int32_t *>(data))); |
664 | } |
665 | return builder.Finish(array); |
666 | } |
667 | arrow::Status build_uint64_array(std::vector<grn_id> &ids, |
668 | grn_obj *grn_column, |
669 | std::shared_ptr<arrow::Array> *array) { |
670 | arrow::UInt64Builder builder(arrow::default_memory_pool()); |
671 | for (auto id : ids) { |
672 | uint32_t size; |
673 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
674 | builder.Append(*(reinterpret_cast<const uint64_t *>(data))); |
675 | } |
676 | return builder.Finish(array); |
677 | } |
678 | |
679 | arrow::Status build_int64_array(std::vector<grn_id> &ids, |
680 | grn_obj *grn_column, |
681 | std::shared_ptr<arrow::Array> *array) { |
682 | arrow::Int64Builder builder(arrow::default_memory_pool()); |
683 | for (auto id : ids) { |
684 | uint32_t size; |
685 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
686 | builder.Append(*(reinterpret_cast<const int64_t *>(data))); |
687 | } |
688 | return builder.Finish(array); |
689 | } |
690 | |
691 | arrow::Status build_double_array(std::vector<grn_id> &ids, |
692 | grn_obj *grn_column, |
693 | std::shared_ptr<arrow::Array> *array) { |
694 | arrow::DoubleBuilder builder(arrow::default_memory_pool()); |
695 | for (auto id : ids) { |
696 | uint32_t size; |
697 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
698 | builder.Append(*(reinterpret_cast<const double *>(data))); |
699 | } |
700 | return builder.Finish(array); |
701 | } |
702 | |
703 | arrow::Status build_timestamp_array(std::vector<grn_id> &ids, |
704 | grn_obj *grn_column, |
705 | std::shared_ptr<arrow::Array> *array) { |
706 | auto timestamp_ns_data_type = |
707 | std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO); |
708 | arrow::TimestampBuilder builder(arrow::default_memory_pool(), |
709 | timestamp_ns_data_type); |
710 | for (auto id : ids) { |
711 | uint32_t size; |
712 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
713 | auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data)); |
714 | builder.Append(timestamp_ns); |
715 | } |
716 | return builder.Finish(array); |
717 | } |
718 | |
719 | arrow::Status build_utf8_array(std::vector<grn_id> &ids, |
720 | grn_obj *grn_column, |
721 | std::shared_ptr<arrow::Array> *array) { |
722 | arrow::StringBuilder builder(arrow::default_memory_pool()); |
723 | for (auto id : ids) { |
724 | uint32_t size; |
725 | auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); |
726 | builder.Append(data, size); |
727 | } |
728 | return builder.Finish(array); |
729 | } |
730 | }; |
731 | } |
732 | #endif /* GRN_WITH_ARROW */ |
733 | |
734 | extern "C" { |
735 | grn_rc |
736 | grn_arrow_load(grn_ctx *ctx, |
737 | grn_obj *table, |
738 | const char *path) |
739 | { |
740 | GRN_API_ENTER; |
741 | #ifdef GRN_WITH_ARROW |
742 | std::shared_ptr<arrow::io::MemoryMappedFile> input; |
743 | auto status = |
744 | arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input); |
745 | if (!grnarrow::check_status(ctx, |
746 | status, |
747 | std::ostringstream() << |
748 | "[arrow][load] failed to open path: " << |
749 | "<" << path << ">" )) { |
750 | GRN_API_RETURN(ctx->rc); |
751 | } |
752 | std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader; |
753 | status = arrow::ipc::RecordBatchFileReader::Open(input, &reader); |
754 | if (!grnarrow::check_status(ctx, |
755 | status, |
756 | "[arrow][load] " |
757 | "failed to create file format reader" )) { |
758 | GRN_API_RETURN(ctx->rc); |
759 | } |
760 | |
761 | grnarrow::FileLoader loader(ctx, table); |
762 | int n_record_batches = reader->num_record_batches(); |
763 | for (int i = 0; i < n_record_batches; ++i) { |
764 | std::shared_ptr<arrow::RecordBatch> record_batch; |
765 | status = reader->ReadRecordBatch(i, &record_batch); |
766 | if (!grnarrow::check_status(ctx, |
767 | status, |
768 | std::ostringstream("" ) << |
769 | "[arrow][load] failed to get " << |
770 | "the " << i << "-th " << "record" )) { |
771 | break; |
772 | } |
773 | loader.load_record_batch(record_batch); |
774 | if (ctx->rc != GRN_SUCCESS) { |
775 | break; |
776 | } |
777 | } |
778 | #else /* GRN_WITH_ARROW */ |
779 | ERR(GRN_FUNCTION_NOT_IMPLEMENTED, |
780 | "[arrow][load] Apache Arrow support isn't enabled" ); |
781 | #endif /* GRN_WITH_ARROW */ |
782 | GRN_API_RETURN(ctx->rc); |
783 | } |
784 | |
785 | grn_rc |
786 | grn_arrow_dump(grn_ctx *ctx, |
787 | grn_obj *table, |
788 | const char *path) |
789 | { |
790 | GRN_API_ENTER; |
791 | #ifdef GRN_WITH_ARROW |
792 | auto all_columns = |
793 | grn_hash_create(ctx, |
794 | NULL, |
795 | sizeof(grn_id), |
796 | 0, |
797 | GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY); |
798 | grn_table_columns(ctx, |
799 | table, |
800 | "" , 0, |
801 | reinterpret_cast<grn_obj *>(all_columns)); |
802 | |
803 | grn_obj columns; |
804 | GRN_PTR_INIT(&columns, GRN_OBJ_VECTOR, GRN_ID_NIL); |
805 | GRN_HASH_EACH_BEGIN(ctx, all_columns, cursor, id) { |
806 | void *key; |
807 | grn_hash_cursor_get_key(ctx, cursor, &key); |
808 | auto column_id = static_cast<grn_id *>(key); |
809 | auto column = grn_ctx_at(ctx, *column_id); |
810 | GRN_PTR_PUT(ctx, &columns, column); |
811 | } GRN_HASH_EACH_END(ctx, cursor); |
812 | grn_hash_close(ctx, all_columns); |
813 | |
814 | grn_arrow_dump_columns(ctx, table, &columns, path); |
815 | GRN_OBJ_FIN(ctx, &columns); |
816 | #else /* GRN_WITH_ARROW */ |
817 | ERR(GRN_FUNCTION_NOT_IMPLEMENTED, |
818 | "[arrow][dump] Apache Arrow support isn't enabled" ); |
819 | #endif /* GRN_WITH_ARROW */ |
820 | GRN_API_RETURN(ctx->rc); |
821 | } |
822 | |
823 | grn_rc |
824 | grn_arrow_dump_columns(grn_ctx *ctx, |
825 | grn_obj *table, |
826 | grn_obj *columns, |
827 | const char *path) |
828 | { |
829 | GRN_API_ENTER; |
830 | #ifdef GRN_WITH_ARROW |
831 | std::shared_ptr<arrow::io::FileOutputStream> output; |
832 | auto status = arrow::io::FileOutputStream::Open(path, &output); |
833 | if (!grnarrow::check_status(ctx, |
834 | status, |
835 | std::stringstream() << |
836 | "[arrow][dump] failed to open path: " << |
837 | "<" << path << ">" )) { |
838 | GRN_API_RETURN(ctx->rc); |
839 | } |
840 | |
841 | grnarrow::FileDumper dumper(ctx, table, columns); |
842 | dumper.dump(output.get()); |
843 | #else /* GRN_WITH_ARROW */ |
844 | ERR(GRN_FUNCTION_NOT_IMPLEMENTED, |
845 | "[arrow][dump] Apache Arrow support isn't enabled" ); |
846 | #endif /* GRN_WITH_ARROW */ |
847 | GRN_API_RETURN(ctx->rc); |
848 | } |
849 | } |
850 | |