1#include "duckdb/common/arrow/arrow_appender.hpp"
2#include "duckdb/common/arrow/arrow_buffer.hpp"
3#include "duckdb/common/vector.hpp"
4#include "duckdb/common/array.hpp"
5#include "duckdb/common/types/interval.hpp"
6#include "duckdb/common/types/uuid.hpp"
7#include "duckdb/function/table/arrow.hpp"
8
9namespace duckdb {
10
11//===--------------------------------------------------------------------===//
12// Arrow append data
13//===--------------------------------------------------------------------===//
14typedef void (*initialize_t)(ArrowAppendData &result, const LogicalType &type, idx_t capacity);
15typedef void (*append_vector_t)(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size);
16typedef void (*finalize_t)(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result);
17
18struct ArrowAppendData {
19 explicit ArrowAppendData(ArrowOptions &options_p) : options(options_p) {
20 }
21 // the buffers of the arrow vector
22 ArrowBuffer validity;
23 ArrowBuffer main_buffer;
24 ArrowBuffer aux_buffer;
25
26 idx_t row_count = 0;
27 idx_t null_count = 0;
28
29 // function pointers for construction
30 initialize_t initialize = nullptr;
31 append_vector_t append_vector = nullptr;
32 finalize_t finalize = nullptr;
33
34 // child data (if any)
35 vector<unique_ptr<ArrowAppendData>> child_data;
36
37 // the arrow array C API data, only set after Finalize
38 unique_ptr<ArrowArray> array;
39 duckdb::array<const void *, 3> buffers = {._M_elems: {nullptr, nullptr, nullptr}};
40 vector<ArrowArray *> child_pointers;
41
42 ArrowOptions options;
43};
44
45//===--------------------------------------------------------------------===//
46// ArrowAppender
47//===--------------------------------------------------------------------===//
48static unique_ptr<ArrowAppendData> InitializeArrowChild(const LogicalType &type, idx_t capacity, ArrowOptions &options);
49static ArrowArray *FinalizeArrowChild(const LogicalType &type, ArrowAppendData &append_data);
50
51ArrowAppender::ArrowAppender(vector<LogicalType> types_p, idx_t initial_capacity, ArrowOptions options)
52 : types(std::move(types_p)) {
53 for (auto &type : types) {
54 auto entry = InitializeArrowChild(type, capacity: initial_capacity, options);
55 root_data.push_back(x: std::move(entry));
56 }
57}
58
59ArrowAppender::~ArrowAppender() {
60}
61
62//===--------------------------------------------------------------------===//
63// Append Helper Functions
64//===--------------------------------------------------------------------===//
65static void GetBitPosition(idx_t row_idx, idx_t &current_byte, uint8_t &current_bit) {
66 current_byte = row_idx / 8;
67 current_bit = row_idx % 8;
68}
69
70static void UnsetBit(uint8_t *data, idx_t current_byte, uint8_t current_bit) {
71 data[current_byte] &= ~((uint64_t)1 << current_bit);
72}
73
74static void NextBit(idx_t &current_byte, uint8_t &current_bit) {
75 current_bit++;
76 if (current_bit == 8) {
77 current_byte++;
78 current_bit = 0;
79 }
80}
81
82static void ResizeValidity(ArrowBuffer &buffer, idx_t row_count) {
83 auto byte_count = (row_count + 7) / 8;
84 buffer.resize(bytes: byte_count, value: 0xFF);
85}
86
87static void SetNull(ArrowAppendData &append_data, uint8_t *validity_data, idx_t current_byte, uint8_t current_bit) {
88 UnsetBit(data: validity_data, current_byte, current_bit);
89 append_data.null_count++;
90}
91
92static void AppendValidity(ArrowAppendData &append_data, UnifiedVectorFormat &format, idx_t from, idx_t to) {
93 // resize the buffer, filling the validity buffer with all valid values
94 idx_t size = to - from;
95 ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size);
96 if (format.validity.AllValid()) {
97 // if all values are valid we don't need to do anything else
98 return;
99 }
100
101 // otherwise we iterate through the validity mask
102 auto validity_data = (uint8_t *)append_data.validity.data();
103 uint8_t current_bit;
104 idx_t current_byte;
105 GetBitPosition(row_idx: append_data.row_count, current_byte, current_bit);
106 for (idx_t i = from; i < to; i++) {
107 auto source_idx = format.sel->get_index(idx: i);
108 // append the validity mask
109 if (!format.validity.RowIsValid(row_idx: source_idx)) {
110 SetNull(append_data, validity_data, current_byte, current_bit);
111 }
112 NextBit(current_byte, current_bit);
113 }
114}
115
116//===--------------------------------------------------------------------===//
117// Scalar Types
118//===--------------------------------------------------------------------===//
119struct ArrowScalarConverter {
120 template <class TGT, class SRC>
121 static TGT Operation(SRC input) {
122 return input;
123 }
124
125 static bool SkipNulls() {
126 return false;
127 }
128
129 template <class TGT>
130 static void SetNull(TGT &value) {
131 }
132};
133
134struct ArrowIntervalConverter {
135 template <class TGT, class SRC>
136 static TGT Operation(SRC input) {
137 ArrowInterval result;
138 result.months = input.months;
139 result.days = input.days;
140 result.nanoseconds = input.micros * Interval::NANOS_PER_MICRO;
141 return result;
142 }
143
144 static bool SkipNulls() {
145 return true;
146 }
147
148 template <class TGT>
149 static void SetNull(TGT &value) {
150 }
151};
152
153template <class TGT, class SRC = TGT, class OP = ArrowScalarConverter>
154struct ArrowScalarBaseData {
155 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
156 idx_t size = to - from;
157 UnifiedVectorFormat format;
158 input.ToUnifiedFormat(count: input_size, data&: format);
159
160 // append the validity mask
161 AppendValidity(append_data, format, from, to);
162
163 // append the main data
164 append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(TGT) * size);
165 auto data = UnifiedVectorFormat::GetData<SRC>(format);
166 auto result_data = append_data.main_buffer.GetData<TGT>();
167
168 for (idx_t i = from; i < to; i++) {
169 auto source_idx = format.sel->get_index(idx: i);
170 auto result_idx = append_data.row_count + i - from;
171
172 if (OP::SkipNulls() && !format.validity.RowIsValid(row_idx: source_idx)) {
173 OP::template SetNull<TGT>(result_data[result_idx]);
174 continue;
175 }
176 result_data[result_idx] = OP::template Operation<TGT, SRC>(data[source_idx]);
177 }
178 append_data.row_count += size;
179 }
180};
181
182template <class TGT, class SRC = TGT, class OP = ArrowScalarConverter>
183struct ArrowScalarData : public ArrowScalarBaseData<TGT, SRC, OP> {
184 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
185 result.main_buffer.reserve(bytes: capacity * sizeof(TGT));
186 }
187
188 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
189 result->n_buffers = 2;
190 result->buffers[1] = append_data.main_buffer.data();
191 }
192};
193
194//===--------------------------------------------------------------------===//
195// Enums
196//===--------------------------------------------------------------------===//
197template <class TGT>
198struct ArrowEnumData : public ArrowScalarBaseData<TGT> {
199 static idx_t GetLength(string_t input) {
200 return input.GetSize();
201 }
202 static void WriteData(data_ptr_t target, string_t input) {
203 memcpy(dest: target, src: input.GetData(), n: input.GetSize());
204 }
205 static void EnumAppendVector(ArrowAppendData &append_data, const Vector &input, idx_t size) {
206 D_ASSERT(input.GetVectorType() == VectorType::FLAT_VECTOR);
207
208 // resize the validity mask and set up the validity buffer for iteration
209 ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size);
210
211 // resize the offset buffer - the offset buffer holds the offsets into the child array
212 append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(uint32_t) * (size + 1));
213 auto data = FlatVector::GetData<string_t>(vector: input);
214 auto offset_data = append_data.main_buffer.GetData<uint32_t>();
215 if (append_data.row_count == 0) {
216 // first entry
217 offset_data[0] = 0;
218 }
219 // now append the string data to the auxiliary buffer
220 // the auxiliary buffer's length depends on the string lengths, so we resize as required
221 auto last_offset = offset_data[append_data.row_count];
222 for (idx_t i = 0; i < size; i++) {
223 auto offset_idx = append_data.row_count + i + 1;
224
225 auto string_length = GetLength(input: data[i]);
226
227 // append the offset data
228 auto current_offset = last_offset + string_length;
229 offset_data[offset_idx] = current_offset;
230
231 // resize the string buffer if required, and write the string data
232 append_data.aux_buffer.resize(bytes: current_offset);
233 WriteData(target: append_data.aux_buffer.data() + last_offset, input: data[i]);
234
235 last_offset = current_offset;
236 }
237 append_data.row_count += size;
238 }
239 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
240 result.main_buffer.reserve(bytes: capacity * sizeof(TGT));
241 // construct the enum child data
242 auto enum_data = InitializeArrowChild(type: LogicalType::VARCHAR, capacity: EnumType::GetSize(type), options&: result.options);
243 EnumAppendVector(append_data&: *enum_data, input: EnumType::GetValuesInsertOrder(type), size: EnumType::GetSize(type));
244 result.child_data.push_back(x: std::move(enum_data));
245 }
246
247 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
248 result->n_buffers = 2;
249 result->buffers[1] = append_data.main_buffer.data();
250 // finalize the enum child data, and assign it to the dictionary
251 result->dictionary = FinalizeArrowChild(type: LogicalType::VARCHAR, append_data&: *append_data.child_data[0]);
252 }
253};
254
255//===--------------------------------------------------------------------===//
256// Boolean
257//===--------------------------------------------------------------------===//
258struct ArrowBoolData {
259 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
260 auto byte_count = (capacity + 7) / 8;
261 result.main_buffer.reserve(bytes: byte_count);
262 }
263
264 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
265 idx_t size = to - from;
266 UnifiedVectorFormat format;
267 input.ToUnifiedFormat(count: input_size, data&: format);
268
269 // we initialize both the validity and the bit set to 1's
270 ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size);
271 ResizeValidity(buffer&: append_data.main_buffer, row_count: append_data.row_count + size);
272 auto data = UnifiedVectorFormat::GetData<bool>(format);
273
274 auto result_data = append_data.main_buffer.GetData<uint8_t>();
275 auto validity_data = append_data.validity.GetData<uint8_t>();
276 uint8_t current_bit;
277 idx_t current_byte;
278 GetBitPosition(row_idx: append_data.row_count, current_byte, current_bit);
279 for (idx_t i = from; i < to; i++) {
280 auto source_idx = format.sel->get_index(idx: i);
281 // append the validity mask
282 if (!format.validity.RowIsValid(row_idx: source_idx)) {
283 SetNull(append_data, validity_data, current_byte, current_bit);
284 } else if (!data[source_idx]) {
285 UnsetBit(data: result_data, current_byte, current_bit);
286 }
287 NextBit(current_byte, current_bit);
288 }
289 append_data.row_count += size;
290 }
291
292 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
293 result->n_buffers = 2;
294 result->buffers[1] = append_data.main_buffer.data();
295 }
296};
297
298//===--------------------------------------------------------------------===//
299// Varchar
300//===--------------------------------------------------------------------===//
301struct ArrowVarcharConverter {
302 template <class SRC>
303 static idx_t GetLength(SRC input) {
304 return input.GetSize();
305 }
306
307 template <class SRC>
308 static void WriteData(data_ptr_t target, SRC input) {
309 memcpy(target, input.GetData(), input.GetSize());
310 }
311};
312
313struct ArrowUUIDConverter {
314 template <class SRC>
315 static idx_t GetLength(SRC input) {
316 return UUID::STRING_SIZE;
317 }
318
319 template <class SRC>
320 static void WriteData(data_ptr_t target, SRC input) {
321 UUID::ToString(input, char_ptr_cast(src: target));
322 }
323};
324
325template <class SRC = string_t, class OP = ArrowVarcharConverter, class BUFTYPE = uint64_t>
326struct ArrowVarcharData {
327 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
328 result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(BUFTYPE));
329
330 result.aux_buffer.reserve(bytes: capacity);
331 }
332
333 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
334 idx_t size = to - from;
335 UnifiedVectorFormat format;
336 input.ToUnifiedFormat(count: input_size, data&: format);
337
338 // resize the validity mask and set up the validity buffer for iteration
339 ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size);
340 auto validity_data = (uint8_t *)append_data.validity.data();
341
342 // resize the offset buffer - the offset buffer holds the offsets into the child array
343 append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(BUFTYPE) * (size + 1));
344 auto data = UnifiedVectorFormat::GetData<SRC>(format);
345 auto offset_data = append_data.main_buffer.GetData<BUFTYPE>();
346 if (append_data.row_count == 0) {
347 // first entry
348 offset_data[0] = 0;
349 }
350 // now append the string data to the auxiliary buffer
351 // the auxiliary buffer's length depends on the string lengths, so we resize as required
352 auto last_offset = offset_data[append_data.row_count];
353 idx_t max_offset = append_data.row_count + to - from;
354 if (max_offset > NumericLimits<uint32_t>::Maximum() &&
355 append_data.options.offset_size == ArrowOffsetSize::REGULAR) {
356 throw InvalidInputException("Arrow Appender: The maximum total string size for regular string buffers is "
357 "%u but the offset of %lu exceeds this.",
358 NumericLimits<uint32_t>::Maximum(), max_offset);
359 }
360 for (idx_t i = from; i < to; i++) {
361 auto source_idx = format.sel->get_index(idx: i);
362 auto offset_idx = append_data.row_count + i + 1 - from;
363
364 if (!format.validity.RowIsValid(row_idx: source_idx)) {
365 uint8_t current_bit;
366 idx_t current_byte;
367 GetBitPosition(row_idx: append_data.row_count + i - from, current_byte, current_bit);
368 SetNull(append_data, validity_data, current_byte, current_bit);
369 offset_data[offset_idx] = last_offset;
370 continue;
371 }
372
373 auto string_length = OP::GetLength(data[source_idx]);
374
375 // append the offset data
376 auto current_offset = last_offset + string_length;
377 offset_data[offset_idx] = current_offset;
378
379 // resize the string buffer if required, and write the string data
380 append_data.aux_buffer.resize(current_offset);
381 OP::WriteData(append_data.aux_buffer.data() + last_offset, data[source_idx]);
382
383 last_offset = current_offset;
384 }
385 append_data.row_count += size;
386 }
387
388 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
389 result->n_buffers = 3;
390 result->buffers[1] = append_data.main_buffer.data();
391 result->buffers[2] = append_data.aux_buffer.data();
392 }
393};
394
395//===--------------------------------------------------------------------===//
396// Structs
397//===--------------------------------------------------------------------===//
398struct ArrowStructData {
399 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
400 auto &children = StructType::GetChildTypes(type);
401 for (auto &child : children) {
402 auto child_buffer = InitializeArrowChild(type: child.second, capacity, options&: result.options);
403 result.child_data.push_back(x: std::move(child_buffer));
404 }
405 }
406
407 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
408 UnifiedVectorFormat format;
409 input.ToUnifiedFormat(count: input_size, data&: format);
410 idx_t size = to - from;
411 AppendValidity(append_data, format, from, to);
412 // append the children of the struct
413 auto &children = StructVector::GetEntries(vector&: input);
414 for (idx_t child_idx = 0; child_idx < children.size(); child_idx++) {
415 auto &child = children[child_idx];
416 auto &child_data = *append_data.child_data[child_idx];
417 child_data.append_vector(child_data, *child, from, to, size);
418 }
419 append_data.row_count += size;
420 }
421
422 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
423 result->n_buffers = 1;
424
425 auto &child_types = StructType::GetChildTypes(type);
426 append_data.child_pointers.resize(new_size: child_types.size());
427 result->children = append_data.child_pointers.data();
428 result->n_children = child_types.size();
429 for (idx_t i = 0; i < child_types.size(); i++) {
430 auto &child_type = child_types[i].second;
431 append_data.child_pointers[i] = FinalizeArrowChild(type: child_type, append_data&: *append_data.child_data[i]);
432 }
433 }
434};
435
436//===--------------------------------------------------------------------===//
437// Lists
438//===--------------------------------------------------------------------===//
439void AppendListOffsets(ArrowAppendData &append_data, UnifiedVectorFormat &format, idx_t from, idx_t to,
440 vector<sel_t> &child_sel) {
441 // resize the offset buffer - the offset buffer holds the offsets into the child array
442 idx_t size = to - from;
443 append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(uint32_t) * (size + 1));
444 auto data = UnifiedVectorFormat::GetData<list_entry_t>(format);
445 auto offset_data = append_data.main_buffer.GetData<uint32_t>();
446 if (append_data.row_count == 0) {
447 // first entry
448 offset_data[0] = 0;
449 }
450 // set up the offsets using the list entries
451 auto last_offset = offset_data[append_data.row_count];
452 for (idx_t i = from; i < to; i++) {
453 auto source_idx = format.sel->get_index(idx: i);
454 auto offset_idx = append_data.row_count + i + 1 - from;
455
456 if (!format.validity.RowIsValid(row_idx: source_idx)) {
457 offset_data[offset_idx] = last_offset;
458 continue;
459 }
460
461 // append the offset data
462 auto list_length = data[source_idx].length;
463 last_offset += list_length;
464 offset_data[offset_idx] = last_offset;
465
466 for (idx_t k = 0; k < list_length; k++) {
467 child_sel.push_back(x: data[source_idx].offset + k);
468 }
469 }
470}
471
472struct ArrowListData {
473 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
474 auto &child_type = ListType::GetChildType(type);
475 result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(uint32_t));
476 auto child_buffer = InitializeArrowChild(type: child_type, capacity, options&: result.options);
477 result.child_data.push_back(x: std::move(child_buffer));
478 }
479
480 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
481 UnifiedVectorFormat format;
482 input.ToUnifiedFormat(count: input_size, data&: format);
483 idx_t size = to - from;
484 vector<sel_t> child_indices;
485 AppendValidity(append_data, format, from, to);
486 AppendListOffsets(append_data, format, from, to, child_sel&: child_indices);
487
488 // append the child vector of the list
489 SelectionVector child_sel(child_indices.data());
490 auto &child = ListVector::GetEntry(vector&: input);
491 auto child_size = child_indices.size();
492 if (size != input_size) {
493 // Let's avoid doing this
494 Vector child_copy(child.GetType());
495 child_copy.Slice(other&: child, sel: child_sel, count: child_size);
496 append_data.child_data[0]->append_vector(*append_data.child_data[0], child_copy, 0, child_size, child_size);
497 } else {
498 // We don't care about the vector, slice it
499 child.Slice(sel: child_sel, count: child_size);
500 append_data.child_data[0]->append_vector(*append_data.child_data[0], child, 0, child_size, child_size);
501 }
502 append_data.row_count += size;
503 }
504
505 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
506 result->n_buffers = 2;
507 result->buffers[1] = append_data.main_buffer.data();
508
509 auto &child_type = ListType::GetChildType(type);
510 append_data.child_pointers.resize(new_size: 1);
511 result->children = append_data.child_pointers.data();
512 result->n_children = 1;
513 append_data.child_pointers[0] = FinalizeArrowChild(type: child_type, append_data&: *append_data.child_data[0]);
514 }
515};
516
517//===--------------------------------------------------------------------===//
518// Maps
519//===--------------------------------------------------------------------===//
520struct ArrowMapData {
521 static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) {
522 // map types are stored in a (too) clever way
523 // the main buffer holds the null values and the offsets
524 // then we have a single child, which is a struct of the map_type, and the key_type
525 result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(uint32_t));
526
527 auto &key_type = MapType::KeyType(type);
528 auto &value_type = MapType::ValueType(type);
529 auto internal_struct = make_uniq<ArrowAppendData>(args&: result.options);
530 internal_struct->child_data.push_back(x: InitializeArrowChild(type: key_type, capacity, options&: result.options));
531 internal_struct->child_data.push_back(x: InitializeArrowChild(type: value_type, capacity, options&: result.options));
532
533 result.child_data.push_back(x: std::move(internal_struct));
534 }
535
536 static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) {
537 UnifiedVectorFormat format;
538 input.ToUnifiedFormat(count: input_size, data&: format);
539 idx_t size = to - from;
540 AppendValidity(append_data, format, from, to);
541 vector<sel_t> child_indices;
542 AppendListOffsets(append_data, format, from, to, child_sel&: child_indices);
543
544 SelectionVector child_sel(child_indices.data());
545 auto &key_vector = MapVector::GetKeys(vector&: input);
546 auto &value_vector = MapVector::GetValues(vector&: input);
547 auto list_size = child_indices.size();
548
549 auto &struct_data = *append_data.child_data[0];
550 auto &key_data = *struct_data.child_data[0];
551 auto &value_data = *struct_data.child_data[1];
552
553 if (size != input_size) {
554 // Let's avoid doing this
555 Vector key_vector_copy(key_vector.GetType());
556 key_vector_copy.Slice(other&: key_vector, sel: child_sel, count: list_size);
557 Vector value_vector_copy(value_vector.GetType());
558 value_vector_copy.Slice(other&: value_vector, sel: child_sel, count: list_size);
559 key_data.append_vector(key_data, key_vector_copy, 0, list_size, list_size);
560 value_data.append_vector(value_data, value_vector_copy, 0, list_size, list_size);
561 } else {
562 // We don't care about the vector, slice it
563 key_vector.Slice(sel: child_sel, count: list_size);
564 value_vector.Slice(sel: child_sel, count: list_size);
565 key_data.append_vector(key_data, key_vector, 0, list_size, list_size);
566 value_data.append_vector(value_data, value_vector, 0, list_size, list_size);
567 }
568
569 append_data.row_count += size;
570 struct_data.row_count += size;
571 }
572
573 static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) {
574 // set up the main map buffer
575 result->n_buffers = 2;
576 result->buffers[1] = append_data.main_buffer.data();
577
578 // the main map buffer has a single child: a struct
579 append_data.child_pointers.resize(new_size: 1);
580 result->children = append_data.child_pointers.data();
581 result->n_children = 1;
582 append_data.child_pointers[0] = FinalizeArrowChild(type, append_data&: *append_data.child_data[0]);
583
584 // now that struct has two children: the key and the value type
585 auto &struct_data = *append_data.child_data[0];
586 auto &struct_result = append_data.child_pointers[0];
587 struct_data.child_pointers.resize(new_size: 2);
588 struct_result->n_buffers = 1;
589 struct_result->n_children = 2;
590 struct_result->length = struct_data.child_data[0]->row_count;
591 struct_result->children = struct_data.child_pointers.data();
592
593 D_ASSERT(struct_data.child_data[0]->row_count == struct_data.child_data[1]->row_count);
594
595 auto &key_type = MapType::KeyType(type);
596 auto &value_type = MapType::ValueType(type);
597 struct_data.child_pointers[0] = FinalizeArrowChild(type: key_type, append_data&: *struct_data.child_data[0]);
598 struct_data.child_pointers[1] = FinalizeArrowChild(type: value_type, append_data&: *struct_data.child_data[1]);
599
600 // keys cannot have null values
601 if (struct_data.child_pointers[0]->null_count > 0) {
602 throw std::runtime_error("Arrow doesn't accept NULL keys on Maps");
603 }
604 }
605};
606
607//! Append a data chunk to the underlying arrow array
608void ArrowAppender::Append(DataChunk &input, idx_t from, idx_t to, idx_t input_size) {
609 D_ASSERT(types == input.GetTypes());
610 for (idx_t i = 0; i < input.ColumnCount(); i++) {
611 root_data[i]->append_vector(*root_data[i], input.data[i], from, to, input_size);
612 }
613 row_count += to - from;
614}
615//===--------------------------------------------------------------------===//
616// Initialize Arrow Child
617//===--------------------------------------------------------------------===//
618template <class OP>
619static void InitializeFunctionPointers(ArrowAppendData &append_data) {
620 append_data.initialize = OP::Initialize;
621 append_data.append_vector = OP::Append;
622 append_data.finalize = OP::Finalize;
623}
624
625static void InitializeFunctionPointers(ArrowAppendData &append_data, const LogicalType &type) {
626 // handle special logical types
627 switch (type.id()) {
628 case LogicalTypeId::BOOLEAN:
629 InitializeFunctionPointers<ArrowBoolData>(append_data);
630 break;
631 case LogicalTypeId::TINYINT:
632 InitializeFunctionPointers<ArrowScalarData<int8_t>>(append_data);
633 break;
634 case LogicalTypeId::SMALLINT:
635 InitializeFunctionPointers<ArrowScalarData<int16_t>>(append_data);
636 break;
637 case LogicalTypeId::DATE:
638 case LogicalTypeId::INTEGER:
639 InitializeFunctionPointers<ArrowScalarData<int32_t>>(append_data);
640 break;
641 case LogicalTypeId::TIME:
642 case LogicalTypeId::TIMESTAMP_SEC:
643 case LogicalTypeId::TIMESTAMP_MS:
644 case LogicalTypeId::TIMESTAMP:
645 case LogicalTypeId::TIMESTAMP_NS:
646 case LogicalTypeId::TIMESTAMP_TZ:
647 case LogicalTypeId::TIME_TZ:
648 case LogicalTypeId::BIGINT:
649 InitializeFunctionPointers<ArrowScalarData<int64_t>>(append_data);
650 break;
651 case LogicalTypeId::HUGEINT:
652 InitializeFunctionPointers<ArrowScalarData<hugeint_t>>(append_data);
653 break;
654 case LogicalTypeId::UTINYINT:
655 InitializeFunctionPointers<ArrowScalarData<uint8_t>>(append_data);
656 break;
657 case LogicalTypeId::USMALLINT:
658 InitializeFunctionPointers<ArrowScalarData<uint16_t>>(append_data);
659 break;
660 case LogicalTypeId::UINTEGER:
661 InitializeFunctionPointers<ArrowScalarData<uint32_t>>(append_data);
662 break;
663 case LogicalTypeId::UBIGINT:
664 InitializeFunctionPointers<ArrowScalarData<uint64_t>>(append_data);
665 break;
666 case LogicalTypeId::FLOAT:
667 InitializeFunctionPointers<ArrowScalarData<float>>(append_data);
668 break;
669 case LogicalTypeId::DOUBLE:
670 InitializeFunctionPointers<ArrowScalarData<double>>(append_data);
671 break;
672 case LogicalTypeId::DECIMAL:
673 switch (type.InternalType()) {
674 case PhysicalType::INT16:
675 InitializeFunctionPointers<ArrowScalarData<hugeint_t, int16_t>>(append_data);
676 break;
677 case PhysicalType::INT32:
678 InitializeFunctionPointers<ArrowScalarData<hugeint_t, int32_t>>(append_data);
679 break;
680 case PhysicalType::INT64:
681 InitializeFunctionPointers<ArrowScalarData<hugeint_t, int64_t>>(append_data);
682 break;
683 case PhysicalType::INT128:
684 InitializeFunctionPointers<ArrowScalarData<hugeint_t>>(append_data);
685 break;
686 default:
687 throw InternalException("Unsupported internal decimal type");
688 }
689 break;
690 case LogicalTypeId::VARCHAR:
691 case LogicalTypeId::BLOB:
692 case LogicalTypeId::BIT:
693 if (append_data.options.offset_size == ArrowOffsetSize::LARGE) {
694 InitializeFunctionPointers<ArrowVarcharData<string_t>>(append_data);
695 } else {
696 InitializeFunctionPointers<ArrowVarcharData<string_t, ArrowVarcharConverter, uint32_t>>(append_data);
697 }
698 break;
699 case LogicalTypeId::UUID:
700 if (append_data.options.offset_size == ArrowOffsetSize::LARGE) {
701 InitializeFunctionPointers<ArrowVarcharData<hugeint_t, ArrowUUIDConverter>>(append_data);
702 } else {
703 InitializeFunctionPointers<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, uint32_t>>(append_data);
704 }
705 break;
706 case LogicalTypeId::ENUM:
707 switch (type.InternalType()) {
708 case PhysicalType::UINT8:
709 InitializeFunctionPointers<ArrowEnumData<uint8_t>>(append_data);
710 break;
711 case PhysicalType::UINT16:
712 InitializeFunctionPointers<ArrowEnumData<uint16_t>>(append_data);
713 break;
714 case PhysicalType::UINT32:
715 InitializeFunctionPointers<ArrowEnumData<uint32_t>>(append_data);
716 break;
717 default:
718 throw InternalException("Unsupported internal enum type");
719 }
720 break;
721 case LogicalTypeId::INTERVAL:
722 InitializeFunctionPointers<ArrowScalarData<ArrowInterval, interval_t, ArrowIntervalConverter>>(append_data);
723 break;
724 case LogicalTypeId::STRUCT:
725 InitializeFunctionPointers<ArrowStructData>(append_data);
726 break;
727 case LogicalTypeId::LIST:
728 InitializeFunctionPointers<ArrowListData>(append_data);
729 break;
730 case LogicalTypeId::MAP:
731 InitializeFunctionPointers<ArrowMapData>(append_data);
732 break;
733 default:
734 throw InternalException("Unsupported type in DuckDB -> Arrow Conversion: %s\n", type.ToString());
735 }
736}
737
738unique_ptr<ArrowAppendData> InitializeArrowChild(const LogicalType &type, idx_t capacity, ArrowOptions &options) {
739 auto result = make_uniq<ArrowAppendData>(args&: options);
740 InitializeFunctionPointers(append_data&: *result, type);
741
742 auto byte_count = (capacity + 7) / 8;
743 result->validity.reserve(bytes: byte_count);
744 result->initialize(*result, type, capacity);
745 return result;
746}
747
748static void ReleaseDuckDBArrowAppendArray(ArrowArray *array) {
749 if (!array || !array->release) {
750 return;
751 }
752 array->release = nullptr;
753 auto holder = static_cast<ArrowAppendData *>(array->private_data);
754 delete holder;
755}
756
757//===--------------------------------------------------------------------===//
758// Finalize Arrow Child
759//===--------------------------------------------------------------------===//
760ArrowArray *FinalizeArrowChild(const LogicalType &type, ArrowAppendData &append_data) {
761 auto result = make_uniq<ArrowArray>();
762
763 result->private_data = nullptr;
764 result->release = ReleaseDuckDBArrowAppendArray;
765 result->n_children = 0;
766 result->null_count = 0;
767 result->offset = 0;
768 result->dictionary = nullptr;
769 result->buffers = append_data.buffers.data();
770 result->null_count = append_data.null_count;
771 result->length = append_data.row_count;
772 result->buffers[0] = append_data.validity.data();
773
774 if (append_data.finalize) {
775 append_data.finalize(append_data, type, result.get());
776 }
777
778 append_data.array = std::move(result);
779 return append_data.array.get();
780}
781
782//! Returns the underlying arrow array
783ArrowArray ArrowAppender::Finalize() {
784 D_ASSERT(root_data.size() == types.size());
785 auto root_holder = make_uniq<ArrowAppendData>(args&: options);
786
787 ArrowArray result;
788 root_holder->child_pointers.resize(new_size: types.size());
789 result.children = root_holder->child_pointers.data();
790 result.n_children = types.size();
791
792 // Configure root array
793 result.length = row_count;
794 result.n_children = types.size();
795 result.n_buffers = 1;
796 result.buffers = root_holder->buffers.data(); // there is no actual buffer there since we don't have NULLs
797 result.offset = 0;
798 result.null_count = 0; // needs to be 0
799 result.dictionary = nullptr;
800 root_holder->child_data = std::move(root_data);
801
802 for (idx_t i = 0; i < root_holder->child_data.size(); i++) {
803 root_holder->child_pointers[i] = FinalizeArrowChild(type: types[i], append_data&: *root_holder->child_data[i]);
804 }
805
806 // Release ownership to caller
807 result.private_data = root_holder.release();
808 result.release = ReleaseDuckDBArrowAppendArray;
809 return result;
810}
811
812} // namespace duckdb
813