1 | #include "duckdb/common/arrow/arrow_appender.hpp" |
2 | #include "duckdb/common/arrow/arrow_buffer.hpp" |
3 | #include "duckdb/common/vector.hpp" |
4 | #include "duckdb/common/array.hpp" |
5 | #include "duckdb/common/types/interval.hpp" |
6 | #include "duckdb/common/types/uuid.hpp" |
7 | #include "duckdb/function/table/arrow.hpp" |
8 | |
9 | namespace duckdb { |
10 | |
11 | //===--------------------------------------------------------------------===// |
12 | // Arrow append data |
13 | //===--------------------------------------------------------------------===// |
14 | typedef void (*initialize_t)(ArrowAppendData &result, const LogicalType &type, idx_t capacity); |
15 | typedef void (*append_vector_t)(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size); |
16 | typedef void (*finalize_t)(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result); |
17 | |
18 | struct ArrowAppendData { |
19 | explicit ArrowAppendData(ArrowOptions &options_p) : options(options_p) { |
20 | } |
21 | // the buffers of the arrow vector |
22 | ArrowBuffer validity; |
23 | ArrowBuffer main_buffer; |
24 | ArrowBuffer aux_buffer; |
25 | |
26 | idx_t row_count = 0; |
27 | idx_t null_count = 0; |
28 | |
29 | // function pointers for construction |
30 | initialize_t initialize = nullptr; |
31 | append_vector_t append_vector = nullptr; |
32 | finalize_t finalize = nullptr; |
33 | |
34 | // child data (if any) |
35 | vector<unique_ptr<ArrowAppendData>> child_data; |
36 | |
37 | // the arrow array C API data, only set after Finalize |
38 | unique_ptr<ArrowArray> array; |
39 | duckdb::array<const void *, 3> buffers = {._M_elems: {nullptr, nullptr, nullptr}}; |
40 | vector<ArrowArray *> child_pointers; |
41 | |
42 | ArrowOptions options; |
43 | }; |
44 | |
45 | //===--------------------------------------------------------------------===// |
46 | // ArrowAppender |
47 | //===--------------------------------------------------------------------===// |
48 | static unique_ptr<ArrowAppendData> InitializeArrowChild(const LogicalType &type, idx_t capacity, ArrowOptions &options); |
49 | static ArrowArray *FinalizeArrowChild(const LogicalType &type, ArrowAppendData &append_data); |
50 | |
51 | ArrowAppender::ArrowAppender(vector<LogicalType> types_p, idx_t initial_capacity, ArrowOptions options) |
52 | : types(std::move(types_p)) { |
53 | for (auto &type : types) { |
54 | auto entry = InitializeArrowChild(type, capacity: initial_capacity, options); |
55 | root_data.push_back(x: std::move(entry)); |
56 | } |
57 | } |
58 | |
59 | ArrowAppender::~ArrowAppender() { |
60 | } |
61 | |
62 | //===--------------------------------------------------------------------===// |
63 | // Append Helper Functions |
64 | //===--------------------------------------------------------------------===// |
65 | static void GetBitPosition(idx_t row_idx, idx_t ¤t_byte, uint8_t ¤t_bit) { |
66 | current_byte = row_idx / 8; |
67 | current_bit = row_idx % 8; |
68 | } |
69 | |
70 | static void UnsetBit(uint8_t *data, idx_t current_byte, uint8_t current_bit) { |
71 | data[current_byte] &= ~((uint64_t)1 << current_bit); |
72 | } |
73 | |
74 | static void NextBit(idx_t ¤t_byte, uint8_t ¤t_bit) { |
75 | current_bit++; |
76 | if (current_bit == 8) { |
77 | current_byte++; |
78 | current_bit = 0; |
79 | } |
80 | } |
81 | |
82 | static void ResizeValidity(ArrowBuffer &buffer, idx_t row_count) { |
83 | auto byte_count = (row_count + 7) / 8; |
84 | buffer.resize(bytes: byte_count, value: 0xFF); |
85 | } |
86 | |
87 | static void SetNull(ArrowAppendData &append_data, uint8_t *validity_data, idx_t current_byte, uint8_t current_bit) { |
88 | UnsetBit(data: validity_data, current_byte, current_bit); |
89 | append_data.null_count++; |
90 | } |
91 | |
92 | static void AppendValidity(ArrowAppendData &append_data, UnifiedVectorFormat &format, idx_t from, idx_t to) { |
93 | // resize the buffer, filling the validity buffer with all valid values |
94 | idx_t size = to - from; |
95 | ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size); |
96 | if (format.validity.AllValid()) { |
97 | // if all values are valid we don't need to do anything else |
98 | return; |
99 | } |
100 | |
101 | // otherwise we iterate through the validity mask |
102 | auto validity_data = (uint8_t *)append_data.validity.data(); |
103 | uint8_t current_bit; |
104 | idx_t current_byte; |
105 | GetBitPosition(row_idx: append_data.row_count, current_byte, current_bit); |
106 | for (idx_t i = from; i < to; i++) { |
107 | auto source_idx = format.sel->get_index(idx: i); |
108 | // append the validity mask |
109 | if (!format.validity.RowIsValid(row_idx: source_idx)) { |
110 | SetNull(append_data, validity_data, current_byte, current_bit); |
111 | } |
112 | NextBit(current_byte, current_bit); |
113 | } |
114 | } |
115 | |
116 | //===--------------------------------------------------------------------===// |
117 | // Scalar Types |
118 | //===--------------------------------------------------------------------===// |
119 | struct ArrowScalarConverter { |
120 | template <class TGT, class SRC> |
121 | static TGT Operation(SRC input) { |
122 | return input; |
123 | } |
124 | |
125 | static bool SkipNulls() { |
126 | return false; |
127 | } |
128 | |
129 | template <class TGT> |
130 | static void SetNull(TGT &value) { |
131 | } |
132 | }; |
133 | |
134 | struct ArrowIntervalConverter { |
135 | template <class TGT, class SRC> |
136 | static TGT Operation(SRC input) { |
137 | ArrowInterval result; |
138 | result.months = input.months; |
139 | result.days = input.days; |
140 | result.nanoseconds = input.micros * Interval::NANOS_PER_MICRO; |
141 | return result; |
142 | } |
143 | |
144 | static bool SkipNulls() { |
145 | return true; |
146 | } |
147 | |
148 | template <class TGT> |
149 | static void SetNull(TGT &value) { |
150 | } |
151 | }; |
152 | |
153 | template <class TGT, class SRC = TGT, class OP = ArrowScalarConverter> |
154 | struct ArrowScalarBaseData { |
155 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
156 | idx_t size = to - from; |
157 | UnifiedVectorFormat format; |
158 | input.ToUnifiedFormat(count: input_size, data&: format); |
159 | |
160 | // append the validity mask |
161 | AppendValidity(append_data, format, from, to); |
162 | |
163 | // append the main data |
164 | append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(TGT) * size); |
165 | auto data = UnifiedVectorFormat::GetData<SRC>(format); |
166 | auto result_data = append_data.main_buffer.GetData<TGT>(); |
167 | |
168 | for (idx_t i = from; i < to; i++) { |
169 | auto source_idx = format.sel->get_index(idx: i); |
170 | auto result_idx = append_data.row_count + i - from; |
171 | |
172 | if (OP::SkipNulls() && !format.validity.RowIsValid(row_idx: source_idx)) { |
173 | OP::template SetNull<TGT>(result_data[result_idx]); |
174 | continue; |
175 | } |
176 | result_data[result_idx] = OP::template Operation<TGT, SRC>(data[source_idx]); |
177 | } |
178 | append_data.row_count += size; |
179 | } |
180 | }; |
181 | |
182 | template <class TGT, class SRC = TGT, class OP = ArrowScalarConverter> |
183 | struct ArrowScalarData : public ArrowScalarBaseData<TGT, SRC, OP> { |
184 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
185 | result.main_buffer.reserve(bytes: capacity * sizeof(TGT)); |
186 | } |
187 | |
188 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
189 | result->n_buffers = 2; |
190 | result->buffers[1] = append_data.main_buffer.data(); |
191 | } |
192 | }; |
193 | |
194 | //===--------------------------------------------------------------------===// |
195 | // Enums |
196 | //===--------------------------------------------------------------------===// |
197 | template <class TGT> |
198 | struct ArrowEnumData : public ArrowScalarBaseData<TGT> { |
199 | static idx_t GetLength(string_t input) { |
200 | return input.GetSize(); |
201 | } |
202 | static void WriteData(data_ptr_t target, string_t input) { |
203 | memcpy(dest: target, src: input.GetData(), n: input.GetSize()); |
204 | } |
205 | static void EnumAppendVector(ArrowAppendData &append_data, const Vector &input, idx_t size) { |
206 | D_ASSERT(input.GetVectorType() == VectorType::FLAT_VECTOR); |
207 | |
208 | // resize the validity mask and set up the validity buffer for iteration |
209 | ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size); |
210 | |
211 | // resize the offset buffer - the offset buffer holds the offsets into the child array |
212 | append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(uint32_t) * (size + 1)); |
213 | auto data = FlatVector::GetData<string_t>(vector: input); |
214 | auto offset_data = append_data.main_buffer.GetData<uint32_t>(); |
215 | if (append_data.row_count == 0) { |
216 | // first entry |
217 | offset_data[0] = 0; |
218 | } |
219 | // now append the string data to the auxiliary buffer |
220 | // the auxiliary buffer's length depends on the string lengths, so we resize as required |
221 | auto last_offset = offset_data[append_data.row_count]; |
222 | for (idx_t i = 0; i < size; i++) { |
223 | auto offset_idx = append_data.row_count + i + 1; |
224 | |
225 | auto string_length = GetLength(input: data[i]); |
226 | |
227 | // append the offset data |
228 | auto current_offset = last_offset + string_length; |
229 | offset_data[offset_idx] = current_offset; |
230 | |
231 | // resize the string buffer if required, and write the string data |
232 | append_data.aux_buffer.resize(bytes: current_offset); |
233 | WriteData(target: append_data.aux_buffer.data() + last_offset, input: data[i]); |
234 | |
235 | last_offset = current_offset; |
236 | } |
237 | append_data.row_count += size; |
238 | } |
239 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
240 | result.main_buffer.reserve(bytes: capacity * sizeof(TGT)); |
241 | // construct the enum child data |
242 | auto enum_data = InitializeArrowChild(type: LogicalType::VARCHAR, capacity: EnumType::GetSize(type), options&: result.options); |
243 | EnumAppendVector(append_data&: *enum_data, input: EnumType::GetValuesInsertOrder(type), size: EnumType::GetSize(type)); |
244 | result.child_data.push_back(x: std::move(enum_data)); |
245 | } |
246 | |
247 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
248 | result->n_buffers = 2; |
249 | result->buffers[1] = append_data.main_buffer.data(); |
250 | // finalize the enum child data, and assign it to the dictionary |
251 | result->dictionary = FinalizeArrowChild(type: LogicalType::VARCHAR, append_data&: *append_data.child_data[0]); |
252 | } |
253 | }; |
254 | |
255 | //===--------------------------------------------------------------------===// |
256 | // Boolean |
257 | //===--------------------------------------------------------------------===// |
258 | struct ArrowBoolData { |
259 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
260 | auto byte_count = (capacity + 7) / 8; |
261 | result.main_buffer.reserve(bytes: byte_count); |
262 | } |
263 | |
264 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
265 | idx_t size = to - from; |
266 | UnifiedVectorFormat format; |
267 | input.ToUnifiedFormat(count: input_size, data&: format); |
268 | |
269 | // we initialize both the validity and the bit set to 1's |
270 | ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size); |
271 | ResizeValidity(buffer&: append_data.main_buffer, row_count: append_data.row_count + size); |
272 | auto data = UnifiedVectorFormat::GetData<bool>(format); |
273 | |
274 | auto result_data = append_data.main_buffer.GetData<uint8_t>(); |
275 | auto validity_data = append_data.validity.GetData<uint8_t>(); |
276 | uint8_t current_bit; |
277 | idx_t current_byte; |
278 | GetBitPosition(row_idx: append_data.row_count, current_byte, current_bit); |
279 | for (idx_t i = from; i < to; i++) { |
280 | auto source_idx = format.sel->get_index(idx: i); |
281 | // append the validity mask |
282 | if (!format.validity.RowIsValid(row_idx: source_idx)) { |
283 | SetNull(append_data, validity_data, current_byte, current_bit); |
284 | } else if (!data[source_idx]) { |
285 | UnsetBit(data: result_data, current_byte, current_bit); |
286 | } |
287 | NextBit(current_byte, current_bit); |
288 | } |
289 | append_data.row_count += size; |
290 | } |
291 | |
292 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
293 | result->n_buffers = 2; |
294 | result->buffers[1] = append_data.main_buffer.data(); |
295 | } |
296 | }; |
297 | |
298 | //===--------------------------------------------------------------------===// |
299 | // Varchar |
300 | //===--------------------------------------------------------------------===// |
301 | struct ArrowVarcharConverter { |
302 | template <class SRC> |
303 | static idx_t GetLength(SRC input) { |
304 | return input.GetSize(); |
305 | } |
306 | |
307 | template <class SRC> |
308 | static void WriteData(data_ptr_t target, SRC input) { |
309 | memcpy(target, input.GetData(), input.GetSize()); |
310 | } |
311 | }; |
312 | |
313 | struct ArrowUUIDConverter { |
314 | template <class SRC> |
315 | static idx_t GetLength(SRC input) { |
316 | return UUID::STRING_SIZE; |
317 | } |
318 | |
319 | template <class SRC> |
320 | static void WriteData(data_ptr_t target, SRC input) { |
321 | UUID::ToString(input, char_ptr_cast(src: target)); |
322 | } |
323 | }; |
324 | |
325 | template <class SRC = string_t, class OP = ArrowVarcharConverter, class BUFTYPE = uint64_t> |
326 | struct ArrowVarcharData { |
327 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
328 | result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(BUFTYPE)); |
329 | |
330 | result.aux_buffer.reserve(bytes: capacity); |
331 | } |
332 | |
333 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
334 | idx_t size = to - from; |
335 | UnifiedVectorFormat format; |
336 | input.ToUnifiedFormat(count: input_size, data&: format); |
337 | |
338 | // resize the validity mask and set up the validity buffer for iteration |
339 | ResizeValidity(buffer&: append_data.validity, row_count: append_data.row_count + size); |
340 | auto validity_data = (uint8_t *)append_data.validity.data(); |
341 | |
342 | // resize the offset buffer - the offset buffer holds the offsets into the child array |
343 | append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(BUFTYPE) * (size + 1)); |
344 | auto data = UnifiedVectorFormat::GetData<SRC>(format); |
345 | auto offset_data = append_data.main_buffer.GetData<BUFTYPE>(); |
346 | if (append_data.row_count == 0) { |
347 | // first entry |
348 | offset_data[0] = 0; |
349 | } |
350 | // now append the string data to the auxiliary buffer |
351 | // the auxiliary buffer's length depends on the string lengths, so we resize as required |
352 | auto last_offset = offset_data[append_data.row_count]; |
353 | idx_t max_offset = append_data.row_count + to - from; |
354 | if (max_offset > NumericLimits<uint32_t>::Maximum() && |
355 | append_data.options.offset_size == ArrowOffsetSize::REGULAR) { |
356 | throw InvalidInputException("Arrow Appender: The maximum total string size for regular string buffers is " |
357 | "%u but the offset of %lu exceeds this." , |
358 | NumericLimits<uint32_t>::Maximum(), max_offset); |
359 | } |
360 | for (idx_t i = from; i < to; i++) { |
361 | auto source_idx = format.sel->get_index(idx: i); |
362 | auto offset_idx = append_data.row_count + i + 1 - from; |
363 | |
364 | if (!format.validity.RowIsValid(row_idx: source_idx)) { |
365 | uint8_t current_bit; |
366 | idx_t current_byte; |
367 | GetBitPosition(row_idx: append_data.row_count + i - from, current_byte, current_bit); |
368 | SetNull(append_data, validity_data, current_byte, current_bit); |
369 | offset_data[offset_idx] = last_offset; |
370 | continue; |
371 | } |
372 | |
373 | auto string_length = OP::GetLength(data[source_idx]); |
374 | |
375 | // append the offset data |
376 | auto current_offset = last_offset + string_length; |
377 | offset_data[offset_idx] = current_offset; |
378 | |
379 | // resize the string buffer if required, and write the string data |
380 | append_data.aux_buffer.resize(current_offset); |
381 | OP::WriteData(append_data.aux_buffer.data() + last_offset, data[source_idx]); |
382 | |
383 | last_offset = current_offset; |
384 | } |
385 | append_data.row_count += size; |
386 | } |
387 | |
388 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
389 | result->n_buffers = 3; |
390 | result->buffers[1] = append_data.main_buffer.data(); |
391 | result->buffers[2] = append_data.aux_buffer.data(); |
392 | } |
393 | }; |
394 | |
395 | //===--------------------------------------------------------------------===// |
396 | // Structs |
397 | //===--------------------------------------------------------------------===// |
398 | struct ArrowStructData { |
399 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
400 | auto &children = StructType::GetChildTypes(type); |
401 | for (auto &child : children) { |
402 | auto child_buffer = InitializeArrowChild(type: child.second, capacity, options&: result.options); |
403 | result.child_data.push_back(x: std::move(child_buffer)); |
404 | } |
405 | } |
406 | |
407 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
408 | UnifiedVectorFormat format; |
409 | input.ToUnifiedFormat(count: input_size, data&: format); |
410 | idx_t size = to - from; |
411 | AppendValidity(append_data, format, from, to); |
412 | // append the children of the struct |
413 | auto &children = StructVector::GetEntries(vector&: input); |
414 | for (idx_t child_idx = 0; child_idx < children.size(); child_idx++) { |
415 | auto &child = children[child_idx]; |
416 | auto &child_data = *append_data.child_data[child_idx]; |
417 | child_data.append_vector(child_data, *child, from, to, size); |
418 | } |
419 | append_data.row_count += size; |
420 | } |
421 | |
422 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
423 | result->n_buffers = 1; |
424 | |
425 | auto &child_types = StructType::GetChildTypes(type); |
426 | append_data.child_pointers.resize(new_size: child_types.size()); |
427 | result->children = append_data.child_pointers.data(); |
428 | result->n_children = child_types.size(); |
429 | for (idx_t i = 0; i < child_types.size(); i++) { |
430 | auto &child_type = child_types[i].second; |
431 | append_data.child_pointers[i] = FinalizeArrowChild(type: child_type, append_data&: *append_data.child_data[i]); |
432 | } |
433 | } |
434 | }; |
435 | |
436 | //===--------------------------------------------------------------------===// |
437 | // Lists |
438 | //===--------------------------------------------------------------------===// |
439 | void AppendListOffsets(ArrowAppendData &append_data, UnifiedVectorFormat &format, idx_t from, idx_t to, |
440 | vector<sel_t> &child_sel) { |
441 | // resize the offset buffer - the offset buffer holds the offsets into the child array |
442 | idx_t size = to - from; |
443 | append_data.main_buffer.resize(bytes: append_data.main_buffer.size() + sizeof(uint32_t) * (size + 1)); |
444 | auto data = UnifiedVectorFormat::GetData<list_entry_t>(format); |
445 | auto offset_data = append_data.main_buffer.GetData<uint32_t>(); |
446 | if (append_data.row_count == 0) { |
447 | // first entry |
448 | offset_data[0] = 0; |
449 | } |
450 | // set up the offsets using the list entries |
451 | auto last_offset = offset_data[append_data.row_count]; |
452 | for (idx_t i = from; i < to; i++) { |
453 | auto source_idx = format.sel->get_index(idx: i); |
454 | auto offset_idx = append_data.row_count + i + 1 - from; |
455 | |
456 | if (!format.validity.RowIsValid(row_idx: source_idx)) { |
457 | offset_data[offset_idx] = last_offset; |
458 | continue; |
459 | } |
460 | |
461 | // append the offset data |
462 | auto list_length = data[source_idx].length; |
463 | last_offset += list_length; |
464 | offset_data[offset_idx] = last_offset; |
465 | |
466 | for (idx_t k = 0; k < list_length; k++) { |
467 | child_sel.push_back(x: data[source_idx].offset + k); |
468 | } |
469 | } |
470 | } |
471 | |
472 | struct ArrowListData { |
473 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
474 | auto &child_type = ListType::GetChildType(type); |
475 | result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(uint32_t)); |
476 | auto child_buffer = InitializeArrowChild(type: child_type, capacity, options&: result.options); |
477 | result.child_data.push_back(x: std::move(child_buffer)); |
478 | } |
479 | |
480 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
481 | UnifiedVectorFormat format; |
482 | input.ToUnifiedFormat(count: input_size, data&: format); |
483 | idx_t size = to - from; |
484 | vector<sel_t> child_indices; |
485 | AppendValidity(append_data, format, from, to); |
486 | AppendListOffsets(append_data, format, from, to, child_sel&: child_indices); |
487 | |
488 | // append the child vector of the list |
489 | SelectionVector child_sel(child_indices.data()); |
490 | auto &child = ListVector::GetEntry(vector&: input); |
491 | auto child_size = child_indices.size(); |
492 | if (size != input_size) { |
493 | // Let's avoid doing this |
494 | Vector child_copy(child.GetType()); |
495 | child_copy.Slice(other&: child, sel: child_sel, count: child_size); |
496 | append_data.child_data[0]->append_vector(*append_data.child_data[0], child_copy, 0, child_size, child_size); |
497 | } else { |
498 | // We don't care about the vector, slice it |
499 | child.Slice(sel: child_sel, count: child_size); |
500 | append_data.child_data[0]->append_vector(*append_data.child_data[0], child, 0, child_size, child_size); |
501 | } |
502 | append_data.row_count += size; |
503 | } |
504 | |
505 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
506 | result->n_buffers = 2; |
507 | result->buffers[1] = append_data.main_buffer.data(); |
508 | |
509 | auto &child_type = ListType::GetChildType(type); |
510 | append_data.child_pointers.resize(new_size: 1); |
511 | result->children = append_data.child_pointers.data(); |
512 | result->n_children = 1; |
513 | append_data.child_pointers[0] = FinalizeArrowChild(type: child_type, append_data&: *append_data.child_data[0]); |
514 | } |
515 | }; |
516 | |
517 | //===--------------------------------------------------------------------===// |
518 | // Maps |
519 | //===--------------------------------------------------------------------===// |
520 | struct ArrowMapData { |
521 | static void Initialize(ArrowAppendData &result, const LogicalType &type, idx_t capacity) { |
522 | // map types are stored in a (too) clever way |
523 | // the main buffer holds the null values and the offsets |
524 | // then we have a single child, which is a struct of the map_type, and the key_type |
525 | result.main_buffer.reserve(bytes: (capacity + 1) * sizeof(uint32_t)); |
526 | |
527 | auto &key_type = MapType::KeyType(type); |
528 | auto &value_type = MapType::ValueType(type); |
529 | auto internal_struct = make_uniq<ArrowAppendData>(args&: result.options); |
530 | internal_struct->child_data.push_back(x: InitializeArrowChild(type: key_type, capacity, options&: result.options)); |
531 | internal_struct->child_data.push_back(x: InitializeArrowChild(type: value_type, capacity, options&: result.options)); |
532 | |
533 | result.child_data.push_back(x: std::move(internal_struct)); |
534 | } |
535 | |
536 | static void Append(ArrowAppendData &append_data, Vector &input, idx_t from, idx_t to, idx_t input_size) { |
537 | UnifiedVectorFormat format; |
538 | input.ToUnifiedFormat(count: input_size, data&: format); |
539 | idx_t size = to - from; |
540 | AppendValidity(append_data, format, from, to); |
541 | vector<sel_t> child_indices; |
542 | AppendListOffsets(append_data, format, from, to, child_sel&: child_indices); |
543 | |
544 | SelectionVector child_sel(child_indices.data()); |
545 | auto &key_vector = MapVector::GetKeys(vector&: input); |
546 | auto &value_vector = MapVector::GetValues(vector&: input); |
547 | auto list_size = child_indices.size(); |
548 | |
549 | auto &struct_data = *append_data.child_data[0]; |
550 | auto &key_data = *struct_data.child_data[0]; |
551 | auto &value_data = *struct_data.child_data[1]; |
552 | |
553 | if (size != input_size) { |
554 | // Let's avoid doing this |
555 | Vector key_vector_copy(key_vector.GetType()); |
556 | key_vector_copy.Slice(other&: key_vector, sel: child_sel, count: list_size); |
557 | Vector value_vector_copy(value_vector.GetType()); |
558 | value_vector_copy.Slice(other&: value_vector, sel: child_sel, count: list_size); |
559 | key_data.append_vector(key_data, key_vector_copy, 0, list_size, list_size); |
560 | value_data.append_vector(value_data, value_vector_copy, 0, list_size, list_size); |
561 | } else { |
562 | // We don't care about the vector, slice it |
563 | key_vector.Slice(sel: child_sel, count: list_size); |
564 | value_vector.Slice(sel: child_sel, count: list_size); |
565 | key_data.append_vector(key_data, key_vector, 0, list_size, list_size); |
566 | value_data.append_vector(value_data, value_vector, 0, list_size, list_size); |
567 | } |
568 | |
569 | append_data.row_count += size; |
570 | struct_data.row_count += size; |
571 | } |
572 | |
573 | static void Finalize(ArrowAppendData &append_data, const LogicalType &type, ArrowArray *result) { |
574 | // set up the main map buffer |
575 | result->n_buffers = 2; |
576 | result->buffers[1] = append_data.main_buffer.data(); |
577 | |
578 | // the main map buffer has a single child: a struct |
579 | append_data.child_pointers.resize(new_size: 1); |
580 | result->children = append_data.child_pointers.data(); |
581 | result->n_children = 1; |
582 | append_data.child_pointers[0] = FinalizeArrowChild(type, append_data&: *append_data.child_data[0]); |
583 | |
584 | // now that struct has two children: the key and the value type |
585 | auto &struct_data = *append_data.child_data[0]; |
586 | auto &struct_result = append_data.child_pointers[0]; |
587 | struct_data.child_pointers.resize(new_size: 2); |
588 | struct_result->n_buffers = 1; |
589 | struct_result->n_children = 2; |
590 | struct_result->length = struct_data.child_data[0]->row_count; |
591 | struct_result->children = struct_data.child_pointers.data(); |
592 | |
593 | D_ASSERT(struct_data.child_data[0]->row_count == struct_data.child_data[1]->row_count); |
594 | |
595 | auto &key_type = MapType::KeyType(type); |
596 | auto &value_type = MapType::ValueType(type); |
597 | struct_data.child_pointers[0] = FinalizeArrowChild(type: key_type, append_data&: *struct_data.child_data[0]); |
598 | struct_data.child_pointers[1] = FinalizeArrowChild(type: value_type, append_data&: *struct_data.child_data[1]); |
599 | |
600 | // keys cannot have null values |
601 | if (struct_data.child_pointers[0]->null_count > 0) { |
602 | throw std::runtime_error("Arrow doesn't accept NULL keys on Maps" ); |
603 | } |
604 | } |
605 | }; |
606 | |
607 | //! Append a data chunk to the underlying arrow array |
608 | void ArrowAppender::Append(DataChunk &input, idx_t from, idx_t to, idx_t input_size) { |
609 | D_ASSERT(types == input.GetTypes()); |
610 | for (idx_t i = 0; i < input.ColumnCount(); i++) { |
611 | root_data[i]->append_vector(*root_data[i], input.data[i], from, to, input_size); |
612 | } |
613 | row_count += to - from; |
614 | } |
615 | //===--------------------------------------------------------------------===// |
616 | // Initialize Arrow Child |
617 | //===--------------------------------------------------------------------===// |
618 | template <class OP> |
619 | static void InitializeFunctionPointers(ArrowAppendData &append_data) { |
620 | append_data.initialize = OP::Initialize; |
621 | append_data.append_vector = OP::Append; |
622 | append_data.finalize = OP::Finalize; |
623 | } |
624 | |
625 | static void InitializeFunctionPointers(ArrowAppendData &append_data, const LogicalType &type) { |
626 | // handle special logical types |
627 | switch (type.id()) { |
628 | case LogicalTypeId::BOOLEAN: |
629 | InitializeFunctionPointers<ArrowBoolData>(append_data); |
630 | break; |
631 | case LogicalTypeId::TINYINT: |
632 | InitializeFunctionPointers<ArrowScalarData<int8_t>>(append_data); |
633 | break; |
634 | case LogicalTypeId::SMALLINT: |
635 | InitializeFunctionPointers<ArrowScalarData<int16_t>>(append_data); |
636 | break; |
637 | case LogicalTypeId::DATE: |
638 | case LogicalTypeId::INTEGER: |
639 | InitializeFunctionPointers<ArrowScalarData<int32_t>>(append_data); |
640 | break; |
641 | case LogicalTypeId::TIME: |
642 | case LogicalTypeId::TIMESTAMP_SEC: |
643 | case LogicalTypeId::TIMESTAMP_MS: |
644 | case LogicalTypeId::TIMESTAMP: |
645 | case LogicalTypeId::TIMESTAMP_NS: |
646 | case LogicalTypeId::TIMESTAMP_TZ: |
647 | case LogicalTypeId::TIME_TZ: |
648 | case LogicalTypeId::BIGINT: |
649 | InitializeFunctionPointers<ArrowScalarData<int64_t>>(append_data); |
650 | break; |
651 | case LogicalTypeId::HUGEINT: |
652 | InitializeFunctionPointers<ArrowScalarData<hugeint_t>>(append_data); |
653 | break; |
654 | case LogicalTypeId::UTINYINT: |
655 | InitializeFunctionPointers<ArrowScalarData<uint8_t>>(append_data); |
656 | break; |
657 | case LogicalTypeId::USMALLINT: |
658 | InitializeFunctionPointers<ArrowScalarData<uint16_t>>(append_data); |
659 | break; |
660 | case LogicalTypeId::UINTEGER: |
661 | InitializeFunctionPointers<ArrowScalarData<uint32_t>>(append_data); |
662 | break; |
663 | case LogicalTypeId::UBIGINT: |
664 | InitializeFunctionPointers<ArrowScalarData<uint64_t>>(append_data); |
665 | break; |
666 | case LogicalTypeId::FLOAT: |
667 | InitializeFunctionPointers<ArrowScalarData<float>>(append_data); |
668 | break; |
669 | case LogicalTypeId::DOUBLE: |
670 | InitializeFunctionPointers<ArrowScalarData<double>>(append_data); |
671 | break; |
672 | case LogicalTypeId::DECIMAL: |
673 | switch (type.InternalType()) { |
674 | case PhysicalType::INT16: |
675 | InitializeFunctionPointers<ArrowScalarData<hugeint_t, int16_t>>(append_data); |
676 | break; |
677 | case PhysicalType::INT32: |
678 | InitializeFunctionPointers<ArrowScalarData<hugeint_t, int32_t>>(append_data); |
679 | break; |
680 | case PhysicalType::INT64: |
681 | InitializeFunctionPointers<ArrowScalarData<hugeint_t, int64_t>>(append_data); |
682 | break; |
683 | case PhysicalType::INT128: |
684 | InitializeFunctionPointers<ArrowScalarData<hugeint_t>>(append_data); |
685 | break; |
686 | default: |
687 | throw InternalException("Unsupported internal decimal type" ); |
688 | } |
689 | break; |
690 | case LogicalTypeId::VARCHAR: |
691 | case LogicalTypeId::BLOB: |
692 | case LogicalTypeId::BIT: |
693 | if (append_data.options.offset_size == ArrowOffsetSize::LARGE) { |
694 | InitializeFunctionPointers<ArrowVarcharData<string_t>>(append_data); |
695 | } else { |
696 | InitializeFunctionPointers<ArrowVarcharData<string_t, ArrowVarcharConverter, uint32_t>>(append_data); |
697 | } |
698 | break; |
699 | case LogicalTypeId::UUID: |
700 | if (append_data.options.offset_size == ArrowOffsetSize::LARGE) { |
701 | InitializeFunctionPointers<ArrowVarcharData<hugeint_t, ArrowUUIDConverter>>(append_data); |
702 | } else { |
703 | InitializeFunctionPointers<ArrowVarcharData<hugeint_t, ArrowUUIDConverter, uint32_t>>(append_data); |
704 | } |
705 | break; |
706 | case LogicalTypeId::ENUM: |
707 | switch (type.InternalType()) { |
708 | case PhysicalType::UINT8: |
709 | InitializeFunctionPointers<ArrowEnumData<uint8_t>>(append_data); |
710 | break; |
711 | case PhysicalType::UINT16: |
712 | InitializeFunctionPointers<ArrowEnumData<uint16_t>>(append_data); |
713 | break; |
714 | case PhysicalType::UINT32: |
715 | InitializeFunctionPointers<ArrowEnumData<uint32_t>>(append_data); |
716 | break; |
717 | default: |
718 | throw InternalException("Unsupported internal enum type" ); |
719 | } |
720 | break; |
721 | case LogicalTypeId::INTERVAL: |
722 | InitializeFunctionPointers<ArrowScalarData<ArrowInterval, interval_t, ArrowIntervalConverter>>(append_data); |
723 | break; |
724 | case LogicalTypeId::STRUCT: |
725 | InitializeFunctionPointers<ArrowStructData>(append_data); |
726 | break; |
727 | case LogicalTypeId::LIST: |
728 | InitializeFunctionPointers<ArrowListData>(append_data); |
729 | break; |
730 | case LogicalTypeId::MAP: |
731 | InitializeFunctionPointers<ArrowMapData>(append_data); |
732 | break; |
733 | default: |
734 | throw InternalException("Unsupported type in DuckDB -> Arrow Conversion: %s\n" , type.ToString()); |
735 | } |
736 | } |
737 | |
738 | unique_ptr<ArrowAppendData> InitializeArrowChild(const LogicalType &type, idx_t capacity, ArrowOptions &options) { |
739 | auto result = make_uniq<ArrowAppendData>(args&: options); |
740 | InitializeFunctionPointers(append_data&: *result, type); |
741 | |
742 | auto byte_count = (capacity + 7) / 8; |
743 | result->validity.reserve(bytes: byte_count); |
744 | result->initialize(*result, type, capacity); |
745 | return result; |
746 | } |
747 | |
748 | static void ReleaseDuckDBArrowAppendArray(ArrowArray *array) { |
749 | if (!array || !array->release) { |
750 | return; |
751 | } |
752 | array->release = nullptr; |
753 | auto holder = static_cast<ArrowAppendData *>(array->private_data); |
754 | delete holder; |
755 | } |
756 | |
757 | //===--------------------------------------------------------------------===// |
758 | // Finalize Arrow Child |
759 | //===--------------------------------------------------------------------===// |
760 | ArrowArray *FinalizeArrowChild(const LogicalType &type, ArrowAppendData &append_data) { |
761 | auto result = make_uniq<ArrowArray>(); |
762 | |
763 | result->private_data = nullptr; |
764 | result->release = ReleaseDuckDBArrowAppendArray; |
765 | result->n_children = 0; |
766 | result->null_count = 0; |
767 | result->offset = 0; |
768 | result->dictionary = nullptr; |
769 | result->buffers = append_data.buffers.data(); |
770 | result->null_count = append_data.null_count; |
771 | result->length = append_data.row_count; |
772 | result->buffers[0] = append_data.validity.data(); |
773 | |
774 | if (append_data.finalize) { |
775 | append_data.finalize(append_data, type, result.get()); |
776 | } |
777 | |
778 | append_data.array = std::move(result); |
779 | return append_data.array.get(); |
780 | } |
781 | |
782 | //! Returns the underlying arrow array |
783 | ArrowArray ArrowAppender::Finalize() { |
784 | D_ASSERT(root_data.size() == types.size()); |
785 | auto root_holder = make_uniq<ArrowAppendData>(args&: options); |
786 | |
787 | ArrowArray result; |
788 | root_holder->child_pointers.resize(new_size: types.size()); |
789 | result.children = root_holder->child_pointers.data(); |
790 | result.n_children = types.size(); |
791 | |
792 | // Configure root array |
793 | result.length = row_count; |
794 | result.n_children = types.size(); |
795 | result.n_buffers = 1; |
796 | result.buffers = root_holder->buffers.data(); // there is no actual buffer there since we don't have NULLs |
797 | result.offset = 0; |
798 | result.null_count = 0; // needs to be 0 |
799 | result.dictionary = nullptr; |
800 | root_holder->child_data = std::move(root_data); |
801 | |
802 | for (idx_t i = 0; i < root_holder->child_data.size(); i++) { |
803 | root_holder->child_pointers[i] = FinalizeArrowChild(type: types[i], append_data&: *root_holder->child_data[i]); |
804 | } |
805 | |
806 | // Release ownership to caller |
807 | result.private_data = root_holder.release(); |
808 | result.release = ReleaseDuckDBArrowAppendArray; |
809 | return result; |
810 | } |
811 | |
812 | } // namespace duckdb |
813 | |