1#include "duckdb/function/table/arrow.hpp"
2#include "duckdb/common/limits.hpp"
3#include "duckdb/common/operator/multiply.hpp"
4#include "duckdb/common/types/hugeint.hpp"
5#include "duckdb/common/types/arrow_aux_data.hpp"
6#include "duckdb/function/scalar/nested_functions.hpp"
7
8namespace {
9using duckdb::idx_t;
10struct ArrowConvertDataIndices {
11 //! The index that refers to 'variable_sz_type' in ArrowConvertData
12 idx_t variable_sized_index;
13 //! The index that refers to 'date_time_precision' in ArrowConvertData
14 idx_t datetime_precision_index;
15};
16} // namespace
17
18namespace duckdb {
19
20static void ShiftRight(unsigned char *ar, int size, int shift) {
21 int carry = 0;
22 while (shift--) {
23 for (int i = size - 1; i >= 0; --i) {
24 int next = (ar[i] & 1) ? 0x80 : 0;
25 ar[i] = carry | (ar[i] >> 1);
26 carry = next;
27 }
28 }
29}
30
31template <class T>
32T *ArrowBufferData(ArrowArray &array, idx_t buffer_idx) {
33 return (T *)array.buffers[buffer_idx]; // NOLINT
34}
35
36static void GetValidityMask(ValidityMask &mask, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
37 int64_t nested_offset = -1, bool add_null = false) {
38 // In certains we don't need to or cannot copy arrow's validity mask to duckdb.
39 //
40 // The conditions where we do want to copy arrow's mask to duckdb are:
41 // 1. nulls exist
42 // 2. n_buffers > 0, meaning the array's arrow type is not `null`
43 // 3. the validity buffer (the first buffer) is not a nullptr
44 if (array.null_count != 0 && array.n_buffers > 0 && array.buffers[0]) {
45 auto bit_offset = scan_state.chunk_offset + array.offset;
46 if (nested_offset != -1) {
47 bit_offset = nested_offset;
48 }
49 mask.EnsureWritable();
50#if STANDARD_VECTOR_SIZE > 64
51 auto n_bitmask_bytes = (size + 8 - 1) / 8;
52 if (bit_offset % 8 == 0) {
53 //! just memcpy nullmask
54 memcpy(dest: (void *)mask.GetData(), src: ArrowBufferData<uint8_t>(array, buffer_idx: 0) + bit_offset / 8, n: n_bitmask_bytes);
55 } else {
56 //! need to re-align nullmask
57 vector<uint8_t> temp_nullmask(n_bitmask_bytes + 1);
58 memcpy(dest: temp_nullmask.data(), src: ArrowBufferData<uint8_t>(array, buffer_idx: 0) + bit_offset / 8, n: n_bitmask_bytes + 1);
59 ShiftRight(ar: temp_nullmask.data(), size: n_bitmask_bytes + 1,
60 shift: bit_offset % 8); //! why this has to be a right shift is a mystery to me
61 memcpy(dest: (void *)mask.GetData(), src: data_ptr_cast(src: temp_nullmask.data()), n: n_bitmask_bytes);
62 }
63#else
64 auto byte_offset = bit_offset / 8;
65 auto source_data = ArrowBufferData<uint8_t>(array, 0);
66 bit_offset %= 8;
67 for (idx_t i = 0; i < size; i++) {
68 mask.Set(i, source_data[byte_offset] & (1 << bit_offset));
69 bit_offset++;
70 if (bit_offset == 8) {
71 bit_offset = 0;
72 byte_offset++;
73 }
74 }
75#endif
76 }
77 if (add_null) {
78 //! We are setting a validity mask of the data part of dictionary vector
79 //! For some reason, Nulls are allowed to be indexes, hence we need to set the last element here to be null
80 //! We might have to resize the mask
81 mask.Resize(old_size: size, new_size: size + 1);
82 mask.SetInvalid(size);
83 }
84}
85
86static void SetValidityMask(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
87 int64_t nested_offset, bool add_null = false) {
88 D_ASSERT(vector.GetVectorType() == VectorType::FLAT_VECTOR);
89 auto &mask = FlatVector::Validity(vector);
90 GetValidityMask(mask, array, scan_state, size, nested_offset, add_null);
91}
92
93static void ColumnArrowToDuckDB(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
94 std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
95 idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset = -1,
96 ValidityMask *parent_mask = nullptr);
97
98static void ArrowToDuckDBList(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
99 std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
100 idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset,
101 ValidityMask *parent_mask) {
102 auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++];
103 idx_t list_size = 0;
104 SetValidityMask(vector, array, scan_state, size, nested_offset);
105 idx_t start_offset = 0;
106 idx_t cur_offset = 0;
107 if (original_type.first == ArrowVariableSizeType::FIXED_SIZE) {
108 //! Have to check validity mask before setting this up
109 idx_t offset = (scan_state.chunk_offset + array.offset) * original_type.second;
110 if (nested_offset != -1) {
111 offset = original_type.second * nested_offset;
112 }
113 start_offset = offset;
114 auto list_data = FlatVector::GetData<list_entry_t>(vector);
115 for (idx_t i = 0; i < size; i++) {
116 auto &le = list_data[i];
117 le.offset = cur_offset;
118 le.length = original_type.second;
119 cur_offset += original_type.second;
120 }
121 list_size = start_offset + cur_offset;
122 } else if (original_type.first == ArrowVariableSizeType::NORMAL) {
123 auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
124 if (nested_offset != -1) {
125 offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + nested_offset;
126 }
127 start_offset = offsets[0];
128 auto list_data = FlatVector::GetData<list_entry_t>(vector);
129 for (idx_t i = 0; i < size; i++) {
130 auto &le = list_data[i];
131 le.offset = cur_offset;
132 le.length = offsets[i + 1] - offsets[i];
133 cur_offset += le.length;
134 }
135 list_size = offsets[size];
136 } else {
137 auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
138 if (nested_offset != -1) {
139 offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + nested_offset;
140 }
141 start_offset = offsets[0];
142 auto list_data = FlatVector::GetData<list_entry_t>(vector);
143 for (idx_t i = 0; i < size; i++) {
144 auto &le = list_data[i];
145 le.offset = cur_offset;
146 le.length = offsets[i + 1] - offsets[i];
147 cur_offset += le.length;
148 }
149 list_size = offsets[size];
150 }
151 list_size -= start_offset;
152 ListVector::Reserve(vec&: vector, required_capacity: list_size);
153 ListVector::SetListSize(vec&: vector, size: list_size);
154 auto &child_vector = ListVector::GetEntry(vector);
155 SetValidityMask(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, nested_offset: start_offset);
156 auto &list_mask = FlatVector::Validity(vector);
157 if (parent_mask) {
158 //! Since this List is owned by a struct we must guarantee their validity map matches on Null
159 if (!parent_mask->AllValid()) {
160 for (idx_t i = 0; i < size; i++) {
161 if (!parent_mask->RowIsValid(row_idx: i)) {
162 list_mask.SetInvalid(i);
163 }
164 }
165 }
166 }
167 if (list_size == 0 && start_offset == 0) {
168 ColumnArrowToDuckDB(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, arrow_convert_data, col_idx,
169 arrow_convert_idx, nested_offset: -1);
170 } else {
171 ColumnArrowToDuckDB(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, arrow_convert_data, col_idx,
172 arrow_convert_idx, nested_offset: start_offset);
173 }
174}
175
176static void ArrowToDuckDBBlob(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
177 std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
178 idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset) {
179 auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++];
180 SetValidityMask(vector, array, scan_state, size, nested_offset);
181 if (original_type.first == ArrowVariableSizeType::FIXED_SIZE) {
182 //! Have to check validity mask before setting this up
183 idx_t offset = (scan_state.chunk_offset + array.offset) * original_type.second;
184 if (nested_offset != -1) {
185 offset = original_type.second * nested_offset;
186 }
187 auto cdata = ArrowBufferData<char>(array, buffer_idx: 1);
188 for (idx_t row_idx = 0; row_idx < size; row_idx++) {
189 if (FlatVector::IsNull(vector, idx: row_idx)) {
190 continue;
191 }
192 auto bptr = cdata + offset;
193 auto blob_len = original_type.second;
194 FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len);
195 offset += blob_len;
196 }
197 } else if (original_type.first == ArrowVariableSizeType::NORMAL) {
198 auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
199 if (nested_offset != -1) {
200 offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + nested_offset;
201 }
202 auto cdata = ArrowBufferData<char>(array, buffer_idx: 2);
203 for (idx_t row_idx = 0; row_idx < size; row_idx++) {
204 if (FlatVector::IsNull(vector, idx: row_idx)) {
205 continue;
206 }
207 auto bptr = cdata + offsets[row_idx];
208 auto blob_len = offsets[row_idx + 1] - offsets[row_idx];
209 FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len);
210 }
211 } else {
212 //! Check if last offset is higher than max uint32
213 if (ArrowBufferData<uint64_t>(array, buffer_idx: 1)[array.length] > NumericLimits<uint32_t>::Maximum()) { // LCOV_EXCL_START
214 throw ConversionException("DuckDB does not support Blobs over 4GB");
215 } // LCOV_EXCL_STOP
216 auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
217 if (nested_offset != -1) {
218 offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + nested_offset;
219 }
220 auto cdata = ArrowBufferData<char>(array, buffer_idx: 2);
221 for (idx_t row_idx = 0; row_idx < size; row_idx++) {
222 if (FlatVector::IsNull(vector, idx: row_idx)) {
223 continue;
224 }
225 auto bptr = cdata + offsets[row_idx];
226 auto blob_len = offsets[row_idx + 1] - offsets[row_idx];
227 FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len);
228 }
229 }
230}
231
232static void ArrowToDuckDBMapVerify(Vector &vector, idx_t count) {
233 auto valid_check = MapVector::CheckMapValidity(map&: vector, count);
234 switch (valid_check) {
235 case MapInvalidReason::VALID:
236 break;
237 case MapInvalidReason::DUPLICATE_KEY: {
238 throw InvalidInputException("Arrow map contains duplicate key, which isn't supported by DuckDB map type");
239 }
240 case MapInvalidReason::NULL_KEY: {
241 throw InvalidInputException("Arrow map contains NULL as map key, which isn't supported by DuckDB map type");
242 }
243 case MapInvalidReason::NULL_KEY_LIST: {
244 throw InvalidInputException("Arrow map contains NULL as key list, which isn't supported by DuckDB map type");
245 }
246 default: {
247 throw InternalException("MapInvalidReason not implemented");
248 }
249 }
250}
251
252template <class T>
253static void SetVectorString(Vector &vector, idx_t size, char *cdata, T *offsets) {
254 auto strings = FlatVector::GetData<string_t>(vector);
255 for (idx_t row_idx = 0; row_idx < size; row_idx++) {
256 if (FlatVector::IsNull(vector, idx: row_idx)) {
257 continue;
258 }
259 auto cptr = cdata + offsets[row_idx];
260 auto str_len = offsets[row_idx + 1] - offsets[row_idx];
261 if (str_len > NumericLimits<uint32_t>::Maximum()) { // LCOV_EXCL_START
262 throw ConversionException("DuckDB does not support Strings over 4GB");
263 } // LCOV_EXCL_STOP
264 strings[row_idx] = string_t(cptr, str_len);
265 }
266}
267
268static void DirectConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
269 int64_t nested_offset) {
270 auto internal_type = GetTypeIdSize(type: vector.GetType().InternalType());
271 auto data_ptr = ArrowBufferData<data_t>(array, buffer_idx: 1) + internal_type * (scan_state.chunk_offset + array.offset);
272 if (nested_offset != -1) {
273 data_ptr = ArrowBufferData<data_t>(array, buffer_idx: 1) + internal_type * (array.offset + nested_offset);
274 }
275 FlatVector::SetData(vector, data: data_ptr);
276}
277
278template <class T>
279static void TimeConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, int64_t nested_offset,
280 idx_t size, int64_t conversion) {
281 auto tgt_ptr = FlatVector::GetData<dtime_t>(vector);
282 auto &validity_mask = FlatVector::Validity(vector);
283 auto src_ptr = (T *)array.buffers[1] + scan_state.chunk_offset + array.offset;
284 if (nested_offset != -1) {
285 src_ptr = (T *)array.buffers[1] + nested_offset + array.offset;
286 }
287 for (idx_t row = 0; row < size; row++) {
288 if (!validity_mask.RowIsValid(row_idx: row)) {
289 continue;
290 }
291 if (!TryMultiplyOperator::Operation(left: (int64_t)src_ptr[row], right: conversion, result&: tgt_ptr[row].micros)) {
292 throw ConversionException("Could not convert Time to Microsecond");
293 }
294 }
295}
296
297static void TimestampTZConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
298 int64_t nested_offset, idx_t size, int64_t conversion) {
299 auto tgt_ptr = FlatVector::GetData<timestamp_t>(vector);
300 auto &validity_mask = FlatVector::Validity(vector);
301 auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
302 if (nested_offset != -1) {
303 src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
304 }
305 for (idx_t row = 0; row < size; row++) {
306 if (!validity_mask.RowIsValid(row_idx: row)) {
307 continue;
308 }
309 if (!TryMultiplyOperator::Operation(left: src_ptr[row], right: conversion, result&: tgt_ptr[row].value)) {
310 throw ConversionException("Could not convert TimestampTZ to Microsecond");
311 }
312 }
313}
314
315static void IntervalConversionUs(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
316 int64_t nested_offset, idx_t size, int64_t conversion) {
317 auto tgt_ptr = FlatVector::GetData<interval_t>(vector);
318 auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
319 if (nested_offset != -1) {
320 src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
321 }
322 for (idx_t row = 0; row < size; row++) {
323 tgt_ptr[row].days = 0;
324 tgt_ptr[row].months = 0;
325 if (!TryMultiplyOperator::Operation(left: src_ptr[row], right: conversion, result&: tgt_ptr[row].micros)) {
326 throw ConversionException("Could not convert Interval to Microsecond");
327 }
328 }
329}
330
331static void IntervalConversionMonths(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
332 int64_t nested_offset, idx_t size) {
333 auto tgt_ptr = FlatVector::GetData<interval_t>(vector);
334 auto src_ptr = ArrowBufferData<int32_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
335 if (nested_offset != -1) {
336 src_ptr = ArrowBufferData<int32_t>(array, buffer_idx: 1) + nested_offset + array.offset;
337 }
338 for (idx_t row = 0; row < size; row++) {
339 tgt_ptr[row].days = 0;
340 tgt_ptr[row].micros = 0;
341 tgt_ptr[row].months = src_ptr[row];
342 }
343}
344
345static void IntervalConversionMonthDayNanos(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
346 int64_t nested_offset, idx_t size) {
347 auto tgt_ptr = FlatVector::GetData<interval_t>(vector);
348 auto src_ptr = ArrowBufferData<ArrowInterval>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
349 if (nested_offset != -1) {
350 src_ptr = ArrowBufferData<ArrowInterval>(array, buffer_idx: 1) + nested_offset + array.offset;
351 }
352 for (idx_t row = 0; row < size; row++) {
353 tgt_ptr[row].days = src_ptr[row].days;
354 tgt_ptr[row].micros = src_ptr[row].nanoseconds / Interval::NANOS_PER_MICRO;
355 tgt_ptr[row].months = src_ptr[row].months;
356 }
357}
358
359static void ColumnArrowToDuckDB(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size,
360 std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
361 idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset,
362 ValidityMask *parent_mask) {
363 switch (vector.GetType().id()) {
364 case LogicalTypeId::SQLNULL:
365 vector.Reference(value: Value());
366 break;
367 case LogicalTypeId::BOOLEAN: {
368 //! Arrow bit-packs boolean values
369 //! Lets first figure out where we are in the source array
370 auto src_ptr = ArrowBufferData<uint8_t>(array, buffer_idx: 1) + (scan_state.chunk_offset + array.offset) / 8;
371
372 if (nested_offset != -1) {
373 src_ptr = ArrowBufferData<uint8_t>(array, buffer_idx: 1) + (nested_offset + array.offset) / 8;
374 }
375 auto tgt_ptr = (uint8_t *)FlatVector::GetData(vector);
376 int src_pos = 0;
377 idx_t cur_bit = scan_state.chunk_offset % 8;
378 if (nested_offset != -1) {
379 cur_bit = nested_offset % 8;
380 }
381 for (idx_t row = 0; row < size; row++) {
382 if ((src_ptr[src_pos] & (1 << cur_bit)) == 0) {
383 tgt_ptr[row] = 0;
384 } else {
385 tgt_ptr[row] = 1;
386 }
387 cur_bit++;
388 if (cur_bit == 8) {
389 src_pos++;
390 cur_bit = 0;
391 }
392 }
393 break;
394 }
395 case LogicalTypeId::TINYINT:
396 case LogicalTypeId::SMALLINT:
397 case LogicalTypeId::INTEGER:
398 case LogicalTypeId::FLOAT:
399 case LogicalTypeId::DOUBLE:
400 case LogicalTypeId::UTINYINT:
401 case LogicalTypeId::USMALLINT:
402 case LogicalTypeId::UINTEGER:
403 case LogicalTypeId::UBIGINT:
404 case LogicalTypeId::BIGINT:
405 case LogicalTypeId::HUGEINT:
406 case LogicalTypeId::TIMESTAMP:
407 case LogicalTypeId::TIMESTAMP_SEC:
408 case LogicalTypeId::TIMESTAMP_MS:
409 case LogicalTypeId::TIMESTAMP_NS: {
410 DirectConversion(vector, array, scan_state, nested_offset);
411 break;
412 }
413 case LogicalTypeId::VARCHAR: {
414 auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++];
415 auto cdata = ArrowBufferData<char>(array, buffer_idx: 2);
416 if (original_type.first == ArrowVariableSizeType::SUPER_SIZE) {
417 auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
418 if (nested_offset != -1) {
419 offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + nested_offset;
420 }
421 SetVectorString(vector, size, cdata, offsets);
422 } else {
423 auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset;
424 if (nested_offset != -1) {
425 offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + nested_offset;
426 }
427 SetVectorString(vector, size, cdata, offsets);
428 }
429 break;
430 }
431 case LogicalTypeId::DATE: {
432 auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++];
433 switch (precision) {
434 case ArrowDateTimeType::DAYS: {
435 DirectConversion(vector, array, scan_state, nested_offset);
436 break;
437 }
438 case ArrowDateTimeType::MILLISECONDS: {
439 //! convert date from nanoseconds to days
440 auto src_ptr = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
441 if (nested_offset != -1) {
442 src_ptr = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
443 }
444 auto tgt_ptr = FlatVector::GetData<date_t>(vector);
445 for (idx_t row = 0; row < size; row++) {
446 tgt_ptr[row] = date_t(int64_t(src_ptr[row]) / static_cast<int64_t>(1000 * 60 * 60 * 24));
447 }
448 break;
449 }
450 default:
451 throw NotImplementedException("Unsupported precision for Date Type ");
452 }
453 break;
454 }
455 case LogicalTypeId::TIME: {
456 auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++];
457 switch (precision) {
458 case ArrowDateTimeType::SECONDS: {
459 TimeConversion<int32_t>(vector, array, scan_state, nested_offset, size, conversion: 1000000);
460 break;
461 }
462 case ArrowDateTimeType::MILLISECONDS: {
463 TimeConversion<int32_t>(vector, array, scan_state, nested_offset, size, conversion: 1000);
464 break;
465 }
466 case ArrowDateTimeType::MICROSECONDS: {
467 TimeConversion<int64_t>(vector, array, scan_state, nested_offset, size, conversion: 1);
468 break;
469 }
470 case ArrowDateTimeType::NANOSECONDS: {
471 auto tgt_ptr = FlatVector::GetData<dtime_t>(vector);
472 auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
473 if (nested_offset != -1) {
474 src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
475 }
476 for (idx_t row = 0; row < size; row++) {
477 tgt_ptr[row].micros = src_ptr[row] / 1000;
478 }
479 break;
480 }
481 default:
482 throw NotImplementedException("Unsupported precision for Time Type ");
483 }
484 break;
485 }
486 case LogicalTypeId::TIMESTAMP_TZ: {
487 auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++];
488 switch (precision) {
489 case ArrowDateTimeType::SECONDS: {
490 TimestampTZConversion(vector, array, scan_state, nested_offset, size, conversion: 1000000);
491 break;
492 }
493 case ArrowDateTimeType::MILLISECONDS: {
494 TimestampTZConversion(vector, array, scan_state, nested_offset, size, conversion: 1000);
495 break;
496 }
497 case ArrowDateTimeType::MICROSECONDS: {
498 DirectConversion(vector, array, scan_state, nested_offset);
499 break;
500 }
501 case ArrowDateTimeType::NANOSECONDS: {
502 auto tgt_ptr = FlatVector::GetData<timestamp_t>(vector);
503 auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
504 if (nested_offset != -1) {
505 src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
506 }
507 for (idx_t row = 0; row < size; row++) {
508 tgt_ptr[row].value = src_ptr[row] / 1000;
509 }
510 break;
511 }
512 default:
513 throw NotImplementedException("Unsupported precision for TimestampTZ Type ");
514 }
515 break;
516 }
517 case LogicalTypeId::INTERVAL: {
518 auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++];
519 switch (precision) {
520 case ArrowDateTimeType::SECONDS: {
521 IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1000000);
522 break;
523 }
524 case ArrowDateTimeType::DAYS:
525 case ArrowDateTimeType::MILLISECONDS: {
526 IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1000);
527 break;
528 }
529 case ArrowDateTimeType::MICROSECONDS: {
530 IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1);
531 break;
532 }
533 case ArrowDateTimeType::NANOSECONDS: {
534 auto tgt_ptr = FlatVector::GetData<interval_t>(vector);
535 auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
536 if (nested_offset != -1) {
537 src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset;
538 }
539 for (idx_t row = 0; row < size; row++) {
540 tgt_ptr[row].micros = src_ptr[row] / 1000;
541 tgt_ptr[row].days = 0;
542 tgt_ptr[row].months = 0;
543 }
544 break;
545 }
546 case ArrowDateTimeType::MONTHS: {
547 IntervalConversionMonths(vector, array, scan_state, nested_offset, size);
548 break;
549 }
550 case ArrowDateTimeType::MONTH_DAY_NANO: {
551 IntervalConversionMonthDayNanos(vector, array, scan_state, nested_offset, size);
552 break;
553 }
554 default:
555 throw NotImplementedException("Unsupported precision for Interval/Duration Type ");
556 }
557 break;
558 }
559 case LogicalTypeId::DECIMAL: {
560 auto val_mask = FlatVector::Validity(vector);
561 //! We have to convert from INT128
562 auto src_ptr = ArrowBufferData<hugeint_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset;
563 if (nested_offset != -1) {
564 src_ptr = ArrowBufferData<hugeint_t>(array, buffer_idx: 1) + nested_offset + array.offset;
565 }
566 switch (vector.GetType().InternalType()) {
567 case PhysicalType::INT16: {
568 auto tgt_ptr = FlatVector::GetData<int16_t>(vector);
569 for (idx_t row = 0; row < size; row++) {
570 if (val_mask.RowIsValid(row_idx: row)) {
571 auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]);
572 D_ASSERT(result);
573 (void)result;
574 }
575 }
576 break;
577 }
578 case PhysicalType::INT32: {
579 auto tgt_ptr = FlatVector::GetData<int32_t>(vector);
580 for (idx_t row = 0; row < size; row++) {
581 if (val_mask.RowIsValid(row_idx: row)) {
582 auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]);
583 D_ASSERT(result);
584 (void)result;
585 }
586 }
587 break;
588 }
589 case PhysicalType::INT64: {
590 auto tgt_ptr = FlatVector::GetData<int64_t>(vector);
591 for (idx_t row = 0; row < size; row++) {
592 if (val_mask.RowIsValid(row_idx: row)) {
593 auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]);
594 D_ASSERT(result);
595 (void)result;
596 }
597 }
598 break;
599 }
600 case PhysicalType::INT128: {
601 FlatVector::SetData(vector,
602 data: ArrowBufferData<data_t>(array, buffer_idx: 1) + GetTypeIdSize(type: vector.GetType().InternalType()) *
603 (scan_state.chunk_offset + array.offset));
604 break;
605 }
606 default:
607 throw NotImplementedException("Unsupported physical type for Decimal: %s",
608 TypeIdToString(type: vector.GetType().InternalType()));
609 }
610 break;
611 }
612 case LogicalTypeId::BLOB: {
613 ArrowToDuckDBBlob(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx,
614 nested_offset);
615 break;
616 }
617 case LogicalTypeId::LIST: {
618 ArrowToDuckDBList(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx,
619 nested_offset, parent_mask);
620 break;
621 }
622 case LogicalTypeId::MAP: {
623 ArrowToDuckDBList(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx,
624 nested_offset, parent_mask);
625 ArrowToDuckDBMapVerify(vector, count: size);
626 break;
627 }
628 case LogicalTypeId::STRUCT: {
629 //! Fill the children
630 auto &child_entries = StructVector::GetEntries(vector);
631 auto &struct_validity_mask = FlatVector::Validity(vector);
632 for (idx_t type_idx = 0; type_idx < (idx_t)array.n_children; type_idx++) {
633 SetValidityMask(vector&: *child_entries[type_idx], array&: *array.children[type_idx], scan_state, size, nested_offset);
634 if (!struct_validity_mask.AllValid()) {
635 auto &child_validity_mark = FlatVector::Validity(vector&: *child_entries[type_idx]);
636 for (idx_t i = 0; i < size; i++) {
637 if (!struct_validity_mask.RowIsValid(row_idx: i)) {
638 child_validity_mark.SetInvalid(i);
639 }
640 }
641 }
642 ColumnArrowToDuckDB(vector&: *child_entries[type_idx], array&: *array.children[type_idx], scan_state, size,
643 arrow_convert_data, col_idx, arrow_convert_idx, nested_offset, parent_mask: &struct_validity_mask);
644 }
645 break;
646 }
647 default:
648 throw NotImplementedException("Unsupported type %s", vector.GetType().ToString());
649 }
650}
651
652template <class T>
653static void SetSelectionVectorLoop(SelectionVector &sel, data_ptr_t indices_p, idx_t size) {
654 auto indices = reinterpret_cast<T *>(indices_p);
655 for (idx_t row = 0; row < size; row++) {
656 sel.set_index(idx: row, loc: indices[row]);
657 }
658}
659
660template <class T>
661static void SetSelectionVectorLoopWithChecks(SelectionVector &sel, data_ptr_t indices_p, idx_t size) {
662
663 auto indices = reinterpret_cast<T *>(indices_p);
664 for (idx_t row = 0; row < size; row++) {
665 if (indices[row] > NumericLimits<uint32_t>::Maximum()) {
666 throw ConversionException("DuckDB only supports indices that fit on an uint32");
667 }
668 sel.set_index(idx: row, loc: indices[row]);
669 }
670}
671
672template <class T>
673static void SetMaskedSelectionVectorLoop(SelectionVector &sel, data_ptr_t indices_p, idx_t size, ValidityMask &mask,
674 idx_t last_element_pos) {
675 auto indices = reinterpret_cast<T *>(indices_p);
676 for (idx_t row = 0; row < size; row++) {
677 if (mask.RowIsValid(row_idx: row)) {
678 sel.set_index(idx: row, loc: indices[row]);
679 } else {
680 //! Need to point out to last element
681 sel.set_index(idx: row, loc: last_element_pos);
682 }
683 }
684}
685
686static void SetSelectionVector(SelectionVector &sel, data_ptr_t indices_p, LogicalType &logical_type, idx_t size,
687 ValidityMask *mask = nullptr, idx_t last_element_pos = 0) {
688 sel.Initialize(count: size);
689
690 if (mask) {
691 switch (logical_type.id()) {
692 case LogicalTypeId::UTINYINT:
693 SetMaskedSelectionVectorLoop<uint8_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
694 break;
695 case LogicalTypeId::TINYINT:
696 SetMaskedSelectionVectorLoop<int8_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
697 break;
698 case LogicalTypeId::USMALLINT:
699 SetMaskedSelectionVectorLoop<uint16_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
700 break;
701 case LogicalTypeId::SMALLINT:
702 SetMaskedSelectionVectorLoop<int16_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
703 break;
704 case LogicalTypeId::UINTEGER:
705 if (last_element_pos > NumericLimits<uint32_t>::Maximum()) {
706 //! Its guaranteed that our indices will point to the last element, so just throw an error
707 throw ConversionException("DuckDB only supports indices that fit on an uint32");
708 }
709 SetMaskedSelectionVectorLoop<uint32_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
710 break;
711 case LogicalTypeId::INTEGER:
712 SetMaskedSelectionVectorLoop<int32_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
713 break;
714 case LogicalTypeId::UBIGINT:
715 if (last_element_pos > NumericLimits<uint32_t>::Maximum()) {
716 //! Its guaranteed that our indices will point to the last element, so just throw an error
717 throw ConversionException("DuckDB only supports indices that fit on an uint32");
718 }
719 SetMaskedSelectionVectorLoop<uint64_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
720 break;
721 case LogicalTypeId::BIGINT:
722 if (last_element_pos > NumericLimits<uint32_t>::Maximum()) {
723 //! Its guaranteed that our indices will point to the last element, so just throw an error
724 throw ConversionException("DuckDB only supports indices that fit on an uint32");
725 }
726 SetMaskedSelectionVectorLoop<int64_t>(sel, indices_p, size, mask&: *mask, last_element_pos);
727 break;
728
729 default:
730 throw NotImplementedException("(Arrow) Unsupported type for selection vectors %s", logical_type.ToString());
731 }
732
733 } else {
734 switch (logical_type.id()) {
735 case LogicalTypeId::UTINYINT:
736 SetSelectionVectorLoop<uint8_t>(sel, indices_p, size);
737 break;
738 case LogicalTypeId::TINYINT:
739 SetSelectionVectorLoop<int8_t>(sel, indices_p, size);
740 break;
741 case LogicalTypeId::USMALLINT:
742 SetSelectionVectorLoop<uint16_t>(sel, indices_p, size);
743 break;
744 case LogicalTypeId::SMALLINT:
745 SetSelectionVectorLoop<int16_t>(sel, indices_p, size);
746 break;
747 case LogicalTypeId::UINTEGER:
748 SetSelectionVectorLoop<uint32_t>(sel, indices_p, size);
749 break;
750 case LogicalTypeId::INTEGER:
751 SetSelectionVectorLoop<int32_t>(sel, indices_p, size);
752 break;
753 case LogicalTypeId::UBIGINT:
754 if (last_element_pos > NumericLimits<uint32_t>::Maximum()) {
755 //! We need to check if our indexes fit in a uint32_t
756 SetSelectionVectorLoopWithChecks<uint64_t>(sel, indices_p, size);
757 } else {
758 SetSelectionVectorLoop<uint64_t>(sel, indices_p, size);
759 }
760 break;
761 case LogicalTypeId::BIGINT:
762 if (last_element_pos > NumericLimits<uint32_t>::Maximum()) {
763 //! We need to check if our indexes fit in a uint32_t
764 SetSelectionVectorLoopWithChecks<int64_t>(sel, indices_p, size);
765 } else {
766 SetSelectionVectorLoop<int64_t>(sel, indices_p, size);
767 }
768 break;
769 default:
770 throw ConversionException("(Arrow) Unsupported type for selection vectors %s", logical_type.ToString());
771 }
772 }
773}
774
775static void ColumnArrowToDuckDBDictionary(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state,
776 idx_t size,
777 std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
778 idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx) {
779 SelectionVector sel;
780 auto &dict_vectors = scan_state.arrow_dictionary_vectors;
781 if (!dict_vectors.count(x: col_idx)) {
782 //! We need to set the dictionary data for this column
783 auto base_vector = make_uniq<Vector>(args: vector.GetType(), args&: array.dictionary->length);
784 SetValidityMask(vector&: *base_vector, array&: *array.dictionary, scan_state, size: array.dictionary->length, nested_offset: 0, add_null: array.null_count > 0);
785 ColumnArrowToDuckDB(vector&: *base_vector, array&: *array.dictionary, scan_state, size: array.dictionary->length, arrow_convert_data,
786 col_idx, arrow_convert_idx);
787 dict_vectors[col_idx] = std::move(base_vector);
788 }
789 auto dictionary_type = arrow_convert_data[col_idx]->dictionary_type;
790 //! Get Pointer to Indices of Dictionary
791 auto indices = ArrowBufferData<data_t>(array, buffer_idx: 1) +
792 GetTypeIdSize(type: dictionary_type.InternalType()) * (scan_state.chunk_offset + array.offset);
793 if (array.null_count > 0) {
794 ValidityMask indices_validity;
795 GetValidityMask(mask&: indices_validity, array, scan_state, size);
796 SetSelectionVector(sel, indices_p: indices, logical_type&: dictionary_type, size, mask: &indices_validity, last_element_pos: array.dictionary->length);
797 } else {
798 SetSelectionVector(sel, indices_p: indices, logical_type&: dictionary_type, size);
799 }
800 vector.Slice(other&: *dict_vectors[col_idx], sel, count: size);
801}
802
803void ArrowTableFunction::ArrowToDuckDB(ArrowScanLocalState &scan_state,
804 unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data,
805 DataChunk &output, idx_t start, bool arrow_scan_is_projected) {
806 for (idx_t idx = 0; idx < output.ColumnCount(); idx++) {
807 auto col_idx = scan_state.column_ids[idx];
808
809 // If projection was not pushed down into the arrow scanner, but projection pushdown is enabled on the
810 // table function, we need to use original column ids here.
811 auto arrow_array_idx = arrow_scan_is_projected ? idx : col_idx;
812
813 if (col_idx == COLUMN_IDENTIFIER_ROW_ID) {
814 // This column is skipped by the projection pushdown
815 continue;
816 }
817
818 ArrowConvertDataIndices arrow_convert_idx {.variable_sized_index: 0, .datetime_precision_index: 0};
819 auto &array = *scan_state.chunk->arrow_array.children[arrow_array_idx];
820 if (!array.release) {
821 throw InvalidInputException("arrow_scan: released array passed");
822 }
823 if (array.length != scan_state.chunk->arrow_array.length) {
824 throw InvalidInputException("arrow_scan: array length mismatch");
825 }
826 // Make sure this Vector keeps the Arrow chunk alive in case we can zero-copy the data
827 output.data[idx].GetBuffer()->SetAuxiliaryData(make_uniq<ArrowAuxiliaryData>(args&: scan_state.chunk));
828 if (array.dictionary) {
829 ColumnArrowToDuckDBDictionary(vector&: output.data[idx], array, scan_state, size: output.size(), arrow_convert_data,
830 col_idx, arrow_convert_idx);
831 } else {
832 SetValidityMask(vector&: output.data[idx], array, scan_state, size: output.size(), nested_offset: -1);
833 ColumnArrowToDuckDB(vector&: output.data[idx], array, scan_state, size: output.size(), arrow_convert_data, col_idx,
834 arrow_convert_idx);
835 }
836 }
837}
838
839} // namespace duckdb
840