1 | #include "duckdb/function/table/arrow.hpp" |
2 | #include "duckdb/common/limits.hpp" |
3 | #include "duckdb/common/operator/multiply.hpp" |
4 | #include "duckdb/common/types/hugeint.hpp" |
5 | #include "duckdb/common/types/arrow_aux_data.hpp" |
6 | #include "duckdb/function/scalar/nested_functions.hpp" |
7 | |
8 | namespace { |
9 | using duckdb::idx_t; |
10 | struct ArrowConvertDataIndices { |
11 | //! The index that refers to 'variable_sz_type' in ArrowConvertData |
12 | idx_t variable_sized_index; |
13 | //! The index that refers to 'date_time_precision' in ArrowConvertData |
14 | idx_t datetime_precision_index; |
15 | }; |
16 | } // namespace |
17 | |
18 | namespace duckdb { |
19 | |
20 | static void ShiftRight(unsigned char *ar, int size, int shift) { |
21 | int carry = 0; |
22 | while (shift--) { |
23 | for (int i = size - 1; i >= 0; --i) { |
24 | int next = (ar[i] & 1) ? 0x80 : 0; |
25 | ar[i] = carry | (ar[i] >> 1); |
26 | carry = next; |
27 | } |
28 | } |
29 | } |
30 | |
31 | template <class T> |
32 | T *ArrowBufferData(ArrowArray &array, idx_t buffer_idx) { |
33 | return (T *)array.buffers[buffer_idx]; // NOLINT |
34 | } |
35 | |
36 | static void GetValidityMask(ValidityMask &mask, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
37 | int64_t nested_offset = -1, bool add_null = false) { |
38 | // In certains we don't need to or cannot copy arrow's validity mask to duckdb. |
39 | // |
40 | // The conditions where we do want to copy arrow's mask to duckdb are: |
41 | // 1. nulls exist |
42 | // 2. n_buffers > 0, meaning the array's arrow type is not `null` |
43 | // 3. the validity buffer (the first buffer) is not a nullptr |
44 | if (array.null_count != 0 && array.n_buffers > 0 && array.buffers[0]) { |
45 | auto bit_offset = scan_state.chunk_offset + array.offset; |
46 | if (nested_offset != -1) { |
47 | bit_offset = nested_offset; |
48 | } |
49 | mask.EnsureWritable(); |
50 | #if STANDARD_VECTOR_SIZE > 64 |
51 | auto n_bitmask_bytes = (size + 8 - 1) / 8; |
52 | if (bit_offset % 8 == 0) { |
53 | //! just memcpy nullmask |
54 | memcpy(dest: (void *)mask.GetData(), src: ArrowBufferData<uint8_t>(array, buffer_idx: 0) + bit_offset / 8, n: n_bitmask_bytes); |
55 | } else { |
56 | //! need to re-align nullmask |
57 | vector<uint8_t> temp_nullmask(n_bitmask_bytes + 1); |
58 | memcpy(dest: temp_nullmask.data(), src: ArrowBufferData<uint8_t>(array, buffer_idx: 0) + bit_offset / 8, n: n_bitmask_bytes + 1); |
59 | ShiftRight(ar: temp_nullmask.data(), size: n_bitmask_bytes + 1, |
60 | shift: bit_offset % 8); //! why this has to be a right shift is a mystery to me |
61 | memcpy(dest: (void *)mask.GetData(), src: data_ptr_cast(src: temp_nullmask.data()), n: n_bitmask_bytes); |
62 | } |
63 | #else |
64 | auto byte_offset = bit_offset / 8; |
65 | auto source_data = ArrowBufferData<uint8_t>(array, 0); |
66 | bit_offset %= 8; |
67 | for (idx_t i = 0; i < size; i++) { |
68 | mask.Set(i, source_data[byte_offset] & (1 << bit_offset)); |
69 | bit_offset++; |
70 | if (bit_offset == 8) { |
71 | bit_offset = 0; |
72 | byte_offset++; |
73 | } |
74 | } |
75 | #endif |
76 | } |
77 | if (add_null) { |
78 | //! We are setting a validity mask of the data part of dictionary vector |
79 | //! For some reason, Nulls are allowed to be indexes, hence we need to set the last element here to be null |
80 | //! We might have to resize the mask |
81 | mask.Resize(old_size: size, new_size: size + 1); |
82 | mask.SetInvalid(size); |
83 | } |
84 | } |
85 | |
86 | static void SetValidityMask(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
87 | int64_t nested_offset, bool add_null = false) { |
88 | D_ASSERT(vector.GetVectorType() == VectorType::FLAT_VECTOR); |
89 | auto &mask = FlatVector::Validity(vector); |
90 | GetValidityMask(mask, array, scan_state, size, nested_offset, add_null); |
91 | } |
92 | |
93 | static void ColumnArrowToDuckDB(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
94 | std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
95 | idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset = -1, |
96 | ValidityMask *parent_mask = nullptr); |
97 | |
98 | static void ArrowToDuckDBList(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
99 | std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
100 | idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset, |
101 | ValidityMask *parent_mask) { |
102 | auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++]; |
103 | idx_t list_size = 0; |
104 | SetValidityMask(vector, array, scan_state, size, nested_offset); |
105 | idx_t start_offset = 0; |
106 | idx_t cur_offset = 0; |
107 | if (original_type.first == ArrowVariableSizeType::FIXED_SIZE) { |
108 | //! Have to check validity mask before setting this up |
109 | idx_t offset = (scan_state.chunk_offset + array.offset) * original_type.second; |
110 | if (nested_offset != -1) { |
111 | offset = original_type.second * nested_offset; |
112 | } |
113 | start_offset = offset; |
114 | auto list_data = FlatVector::GetData<list_entry_t>(vector); |
115 | for (idx_t i = 0; i < size; i++) { |
116 | auto &le = list_data[i]; |
117 | le.offset = cur_offset; |
118 | le.length = original_type.second; |
119 | cur_offset += original_type.second; |
120 | } |
121 | list_size = start_offset + cur_offset; |
122 | } else if (original_type.first == ArrowVariableSizeType::NORMAL) { |
123 | auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
124 | if (nested_offset != -1) { |
125 | offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + nested_offset; |
126 | } |
127 | start_offset = offsets[0]; |
128 | auto list_data = FlatVector::GetData<list_entry_t>(vector); |
129 | for (idx_t i = 0; i < size; i++) { |
130 | auto &le = list_data[i]; |
131 | le.offset = cur_offset; |
132 | le.length = offsets[i + 1] - offsets[i]; |
133 | cur_offset += le.length; |
134 | } |
135 | list_size = offsets[size]; |
136 | } else { |
137 | auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
138 | if (nested_offset != -1) { |
139 | offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + nested_offset; |
140 | } |
141 | start_offset = offsets[0]; |
142 | auto list_data = FlatVector::GetData<list_entry_t>(vector); |
143 | for (idx_t i = 0; i < size; i++) { |
144 | auto &le = list_data[i]; |
145 | le.offset = cur_offset; |
146 | le.length = offsets[i + 1] - offsets[i]; |
147 | cur_offset += le.length; |
148 | } |
149 | list_size = offsets[size]; |
150 | } |
151 | list_size -= start_offset; |
152 | ListVector::Reserve(vec&: vector, required_capacity: list_size); |
153 | ListVector::SetListSize(vec&: vector, size: list_size); |
154 | auto &child_vector = ListVector::GetEntry(vector); |
155 | SetValidityMask(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, nested_offset: start_offset); |
156 | auto &list_mask = FlatVector::Validity(vector); |
157 | if (parent_mask) { |
158 | //! Since this List is owned by a struct we must guarantee their validity map matches on Null |
159 | if (!parent_mask->AllValid()) { |
160 | for (idx_t i = 0; i < size; i++) { |
161 | if (!parent_mask->RowIsValid(row_idx: i)) { |
162 | list_mask.SetInvalid(i); |
163 | } |
164 | } |
165 | } |
166 | } |
167 | if (list_size == 0 && start_offset == 0) { |
168 | ColumnArrowToDuckDB(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, arrow_convert_data, col_idx, |
169 | arrow_convert_idx, nested_offset: -1); |
170 | } else { |
171 | ColumnArrowToDuckDB(vector&: child_vector, array&: *array.children[0], scan_state, size: list_size, arrow_convert_data, col_idx, |
172 | arrow_convert_idx, nested_offset: start_offset); |
173 | } |
174 | } |
175 | |
176 | static void ArrowToDuckDBBlob(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
177 | std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
178 | idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset) { |
179 | auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++]; |
180 | SetValidityMask(vector, array, scan_state, size, nested_offset); |
181 | if (original_type.first == ArrowVariableSizeType::FIXED_SIZE) { |
182 | //! Have to check validity mask before setting this up |
183 | idx_t offset = (scan_state.chunk_offset + array.offset) * original_type.second; |
184 | if (nested_offset != -1) { |
185 | offset = original_type.second * nested_offset; |
186 | } |
187 | auto cdata = ArrowBufferData<char>(array, buffer_idx: 1); |
188 | for (idx_t row_idx = 0; row_idx < size; row_idx++) { |
189 | if (FlatVector::IsNull(vector, idx: row_idx)) { |
190 | continue; |
191 | } |
192 | auto bptr = cdata + offset; |
193 | auto blob_len = original_type.second; |
194 | FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len); |
195 | offset += blob_len; |
196 | } |
197 | } else if (original_type.first == ArrowVariableSizeType::NORMAL) { |
198 | auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
199 | if (nested_offset != -1) { |
200 | offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + nested_offset; |
201 | } |
202 | auto cdata = ArrowBufferData<char>(array, buffer_idx: 2); |
203 | for (idx_t row_idx = 0; row_idx < size; row_idx++) { |
204 | if (FlatVector::IsNull(vector, idx: row_idx)) { |
205 | continue; |
206 | } |
207 | auto bptr = cdata + offsets[row_idx]; |
208 | auto blob_len = offsets[row_idx + 1] - offsets[row_idx]; |
209 | FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len); |
210 | } |
211 | } else { |
212 | //! Check if last offset is higher than max uint32 |
213 | if (ArrowBufferData<uint64_t>(array, buffer_idx: 1)[array.length] > NumericLimits<uint32_t>::Maximum()) { // LCOV_EXCL_START |
214 | throw ConversionException("DuckDB does not support Blobs over 4GB" ); |
215 | } // LCOV_EXCL_STOP |
216 | auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
217 | if (nested_offset != -1) { |
218 | offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + nested_offset; |
219 | } |
220 | auto cdata = ArrowBufferData<char>(array, buffer_idx: 2); |
221 | for (idx_t row_idx = 0; row_idx < size; row_idx++) { |
222 | if (FlatVector::IsNull(vector, idx: row_idx)) { |
223 | continue; |
224 | } |
225 | auto bptr = cdata + offsets[row_idx]; |
226 | auto blob_len = offsets[row_idx + 1] - offsets[row_idx]; |
227 | FlatVector::GetData<string_t>(vector)[row_idx] = StringVector::AddStringOrBlob(vector, data: bptr, len: blob_len); |
228 | } |
229 | } |
230 | } |
231 | |
232 | static void ArrowToDuckDBMapVerify(Vector &vector, idx_t count) { |
233 | auto valid_check = MapVector::CheckMapValidity(map&: vector, count); |
234 | switch (valid_check) { |
235 | case MapInvalidReason::VALID: |
236 | break; |
237 | case MapInvalidReason::DUPLICATE_KEY: { |
238 | throw InvalidInputException("Arrow map contains duplicate key, which isn't supported by DuckDB map type" ); |
239 | } |
240 | case MapInvalidReason::NULL_KEY: { |
241 | throw InvalidInputException("Arrow map contains NULL as map key, which isn't supported by DuckDB map type" ); |
242 | } |
243 | case MapInvalidReason::NULL_KEY_LIST: { |
244 | throw InvalidInputException("Arrow map contains NULL as key list, which isn't supported by DuckDB map type" ); |
245 | } |
246 | default: { |
247 | throw InternalException("MapInvalidReason not implemented" ); |
248 | } |
249 | } |
250 | } |
251 | |
252 | template <class T> |
253 | static void SetVectorString(Vector &vector, idx_t size, char *cdata, T *offsets) { |
254 | auto strings = FlatVector::GetData<string_t>(vector); |
255 | for (idx_t row_idx = 0; row_idx < size; row_idx++) { |
256 | if (FlatVector::IsNull(vector, idx: row_idx)) { |
257 | continue; |
258 | } |
259 | auto cptr = cdata + offsets[row_idx]; |
260 | auto str_len = offsets[row_idx + 1] - offsets[row_idx]; |
261 | if (str_len > NumericLimits<uint32_t>::Maximum()) { // LCOV_EXCL_START |
262 | throw ConversionException("DuckDB does not support Strings over 4GB" ); |
263 | } // LCOV_EXCL_STOP |
264 | strings[row_idx] = string_t(cptr, str_len); |
265 | } |
266 | } |
267 | |
268 | static void DirectConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
269 | int64_t nested_offset) { |
270 | auto internal_type = GetTypeIdSize(type: vector.GetType().InternalType()); |
271 | auto data_ptr = ArrowBufferData<data_t>(array, buffer_idx: 1) + internal_type * (scan_state.chunk_offset + array.offset); |
272 | if (nested_offset != -1) { |
273 | data_ptr = ArrowBufferData<data_t>(array, buffer_idx: 1) + internal_type * (array.offset + nested_offset); |
274 | } |
275 | FlatVector::SetData(vector, data: data_ptr); |
276 | } |
277 | |
278 | template <class T> |
279 | static void TimeConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, int64_t nested_offset, |
280 | idx_t size, int64_t conversion) { |
281 | auto tgt_ptr = FlatVector::GetData<dtime_t>(vector); |
282 | auto &validity_mask = FlatVector::Validity(vector); |
283 | auto src_ptr = (T *)array.buffers[1] + scan_state.chunk_offset + array.offset; |
284 | if (nested_offset != -1) { |
285 | src_ptr = (T *)array.buffers[1] + nested_offset + array.offset; |
286 | } |
287 | for (idx_t row = 0; row < size; row++) { |
288 | if (!validity_mask.RowIsValid(row_idx: row)) { |
289 | continue; |
290 | } |
291 | if (!TryMultiplyOperator::Operation(left: (int64_t)src_ptr[row], right: conversion, result&: tgt_ptr[row].micros)) { |
292 | throw ConversionException("Could not convert Time to Microsecond" ); |
293 | } |
294 | } |
295 | } |
296 | |
297 | static void TimestampTZConversion(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
298 | int64_t nested_offset, idx_t size, int64_t conversion) { |
299 | auto tgt_ptr = FlatVector::GetData<timestamp_t>(vector); |
300 | auto &validity_mask = FlatVector::Validity(vector); |
301 | auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
302 | if (nested_offset != -1) { |
303 | src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
304 | } |
305 | for (idx_t row = 0; row < size; row++) { |
306 | if (!validity_mask.RowIsValid(row_idx: row)) { |
307 | continue; |
308 | } |
309 | if (!TryMultiplyOperator::Operation(left: src_ptr[row], right: conversion, result&: tgt_ptr[row].value)) { |
310 | throw ConversionException("Could not convert TimestampTZ to Microsecond" ); |
311 | } |
312 | } |
313 | } |
314 | |
315 | static void IntervalConversionUs(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
316 | int64_t nested_offset, idx_t size, int64_t conversion) { |
317 | auto tgt_ptr = FlatVector::GetData<interval_t>(vector); |
318 | auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
319 | if (nested_offset != -1) { |
320 | src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
321 | } |
322 | for (idx_t row = 0; row < size; row++) { |
323 | tgt_ptr[row].days = 0; |
324 | tgt_ptr[row].months = 0; |
325 | if (!TryMultiplyOperator::Operation(left: src_ptr[row], right: conversion, result&: tgt_ptr[row].micros)) { |
326 | throw ConversionException("Could not convert Interval to Microsecond" ); |
327 | } |
328 | } |
329 | } |
330 | |
331 | static void IntervalConversionMonths(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
332 | int64_t nested_offset, idx_t size) { |
333 | auto tgt_ptr = FlatVector::GetData<interval_t>(vector); |
334 | auto src_ptr = ArrowBufferData<int32_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
335 | if (nested_offset != -1) { |
336 | src_ptr = ArrowBufferData<int32_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
337 | } |
338 | for (idx_t row = 0; row < size; row++) { |
339 | tgt_ptr[row].days = 0; |
340 | tgt_ptr[row].micros = 0; |
341 | tgt_ptr[row].months = src_ptr[row]; |
342 | } |
343 | } |
344 | |
345 | static void IntervalConversionMonthDayNanos(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
346 | int64_t nested_offset, idx_t size) { |
347 | auto tgt_ptr = FlatVector::GetData<interval_t>(vector); |
348 | auto src_ptr = ArrowBufferData<ArrowInterval>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
349 | if (nested_offset != -1) { |
350 | src_ptr = ArrowBufferData<ArrowInterval>(array, buffer_idx: 1) + nested_offset + array.offset; |
351 | } |
352 | for (idx_t row = 0; row < size; row++) { |
353 | tgt_ptr[row].days = src_ptr[row].days; |
354 | tgt_ptr[row].micros = src_ptr[row].nanoseconds / Interval::NANOS_PER_MICRO; |
355 | tgt_ptr[row].months = src_ptr[row].months; |
356 | } |
357 | } |
358 | |
359 | static void ColumnArrowToDuckDB(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, idx_t size, |
360 | std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
361 | idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx, int64_t nested_offset, |
362 | ValidityMask *parent_mask) { |
363 | switch (vector.GetType().id()) { |
364 | case LogicalTypeId::SQLNULL: |
365 | vector.Reference(value: Value()); |
366 | break; |
367 | case LogicalTypeId::BOOLEAN: { |
368 | //! Arrow bit-packs boolean values |
369 | //! Lets first figure out where we are in the source array |
370 | auto src_ptr = ArrowBufferData<uint8_t>(array, buffer_idx: 1) + (scan_state.chunk_offset + array.offset) / 8; |
371 | |
372 | if (nested_offset != -1) { |
373 | src_ptr = ArrowBufferData<uint8_t>(array, buffer_idx: 1) + (nested_offset + array.offset) / 8; |
374 | } |
375 | auto tgt_ptr = (uint8_t *)FlatVector::GetData(vector); |
376 | int src_pos = 0; |
377 | idx_t cur_bit = scan_state.chunk_offset % 8; |
378 | if (nested_offset != -1) { |
379 | cur_bit = nested_offset % 8; |
380 | } |
381 | for (idx_t row = 0; row < size; row++) { |
382 | if ((src_ptr[src_pos] & (1 << cur_bit)) == 0) { |
383 | tgt_ptr[row] = 0; |
384 | } else { |
385 | tgt_ptr[row] = 1; |
386 | } |
387 | cur_bit++; |
388 | if (cur_bit == 8) { |
389 | src_pos++; |
390 | cur_bit = 0; |
391 | } |
392 | } |
393 | break; |
394 | } |
395 | case LogicalTypeId::TINYINT: |
396 | case LogicalTypeId::SMALLINT: |
397 | case LogicalTypeId::INTEGER: |
398 | case LogicalTypeId::FLOAT: |
399 | case LogicalTypeId::DOUBLE: |
400 | case LogicalTypeId::UTINYINT: |
401 | case LogicalTypeId::USMALLINT: |
402 | case LogicalTypeId::UINTEGER: |
403 | case LogicalTypeId::UBIGINT: |
404 | case LogicalTypeId::BIGINT: |
405 | case LogicalTypeId::HUGEINT: |
406 | case LogicalTypeId::TIMESTAMP: |
407 | case LogicalTypeId::TIMESTAMP_SEC: |
408 | case LogicalTypeId::TIMESTAMP_MS: |
409 | case LogicalTypeId::TIMESTAMP_NS: { |
410 | DirectConversion(vector, array, scan_state, nested_offset); |
411 | break; |
412 | } |
413 | case LogicalTypeId::VARCHAR: { |
414 | auto original_type = arrow_convert_data[col_idx]->variable_sz_type[arrow_convert_idx.variable_sized_index++]; |
415 | auto cdata = ArrowBufferData<char>(array, buffer_idx: 2); |
416 | if (original_type.first == ArrowVariableSizeType::SUPER_SIZE) { |
417 | auto offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
418 | if (nested_offset != -1) { |
419 | offsets = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + array.offset + nested_offset; |
420 | } |
421 | SetVectorString(vector, size, cdata, offsets); |
422 | } else { |
423 | auto offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + scan_state.chunk_offset; |
424 | if (nested_offset != -1) { |
425 | offsets = ArrowBufferData<uint32_t>(array, buffer_idx: 1) + array.offset + nested_offset; |
426 | } |
427 | SetVectorString(vector, size, cdata, offsets); |
428 | } |
429 | break; |
430 | } |
431 | case LogicalTypeId::DATE: { |
432 | auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++]; |
433 | switch (precision) { |
434 | case ArrowDateTimeType::DAYS: { |
435 | DirectConversion(vector, array, scan_state, nested_offset); |
436 | break; |
437 | } |
438 | case ArrowDateTimeType::MILLISECONDS: { |
439 | //! convert date from nanoseconds to days |
440 | auto src_ptr = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
441 | if (nested_offset != -1) { |
442 | src_ptr = ArrowBufferData<uint64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
443 | } |
444 | auto tgt_ptr = FlatVector::GetData<date_t>(vector); |
445 | for (idx_t row = 0; row < size; row++) { |
446 | tgt_ptr[row] = date_t(int64_t(src_ptr[row]) / static_cast<int64_t>(1000 * 60 * 60 * 24)); |
447 | } |
448 | break; |
449 | } |
450 | default: |
451 | throw NotImplementedException("Unsupported precision for Date Type " ); |
452 | } |
453 | break; |
454 | } |
455 | case LogicalTypeId::TIME: { |
456 | auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++]; |
457 | switch (precision) { |
458 | case ArrowDateTimeType::SECONDS: { |
459 | TimeConversion<int32_t>(vector, array, scan_state, nested_offset, size, conversion: 1000000); |
460 | break; |
461 | } |
462 | case ArrowDateTimeType::MILLISECONDS: { |
463 | TimeConversion<int32_t>(vector, array, scan_state, nested_offset, size, conversion: 1000); |
464 | break; |
465 | } |
466 | case ArrowDateTimeType::MICROSECONDS: { |
467 | TimeConversion<int64_t>(vector, array, scan_state, nested_offset, size, conversion: 1); |
468 | break; |
469 | } |
470 | case ArrowDateTimeType::NANOSECONDS: { |
471 | auto tgt_ptr = FlatVector::GetData<dtime_t>(vector); |
472 | auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
473 | if (nested_offset != -1) { |
474 | src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
475 | } |
476 | for (idx_t row = 0; row < size; row++) { |
477 | tgt_ptr[row].micros = src_ptr[row] / 1000; |
478 | } |
479 | break; |
480 | } |
481 | default: |
482 | throw NotImplementedException("Unsupported precision for Time Type " ); |
483 | } |
484 | break; |
485 | } |
486 | case LogicalTypeId::TIMESTAMP_TZ: { |
487 | auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++]; |
488 | switch (precision) { |
489 | case ArrowDateTimeType::SECONDS: { |
490 | TimestampTZConversion(vector, array, scan_state, nested_offset, size, conversion: 1000000); |
491 | break; |
492 | } |
493 | case ArrowDateTimeType::MILLISECONDS: { |
494 | TimestampTZConversion(vector, array, scan_state, nested_offset, size, conversion: 1000); |
495 | break; |
496 | } |
497 | case ArrowDateTimeType::MICROSECONDS: { |
498 | DirectConversion(vector, array, scan_state, nested_offset); |
499 | break; |
500 | } |
501 | case ArrowDateTimeType::NANOSECONDS: { |
502 | auto tgt_ptr = FlatVector::GetData<timestamp_t>(vector); |
503 | auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
504 | if (nested_offset != -1) { |
505 | src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
506 | } |
507 | for (idx_t row = 0; row < size; row++) { |
508 | tgt_ptr[row].value = src_ptr[row] / 1000; |
509 | } |
510 | break; |
511 | } |
512 | default: |
513 | throw NotImplementedException("Unsupported precision for TimestampTZ Type " ); |
514 | } |
515 | break; |
516 | } |
517 | case LogicalTypeId::INTERVAL: { |
518 | auto precision = arrow_convert_data[col_idx]->date_time_precision[arrow_convert_idx.datetime_precision_index++]; |
519 | switch (precision) { |
520 | case ArrowDateTimeType::SECONDS: { |
521 | IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1000000); |
522 | break; |
523 | } |
524 | case ArrowDateTimeType::DAYS: |
525 | case ArrowDateTimeType::MILLISECONDS: { |
526 | IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1000); |
527 | break; |
528 | } |
529 | case ArrowDateTimeType::MICROSECONDS: { |
530 | IntervalConversionUs(vector, array, scan_state, nested_offset, size, conversion: 1); |
531 | break; |
532 | } |
533 | case ArrowDateTimeType::NANOSECONDS: { |
534 | auto tgt_ptr = FlatVector::GetData<interval_t>(vector); |
535 | auto src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
536 | if (nested_offset != -1) { |
537 | src_ptr = ArrowBufferData<int64_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
538 | } |
539 | for (idx_t row = 0; row < size; row++) { |
540 | tgt_ptr[row].micros = src_ptr[row] / 1000; |
541 | tgt_ptr[row].days = 0; |
542 | tgt_ptr[row].months = 0; |
543 | } |
544 | break; |
545 | } |
546 | case ArrowDateTimeType::MONTHS: { |
547 | IntervalConversionMonths(vector, array, scan_state, nested_offset, size); |
548 | break; |
549 | } |
550 | case ArrowDateTimeType::MONTH_DAY_NANO: { |
551 | IntervalConversionMonthDayNanos(vector, array, scan_state, nested_offset, size); |
552 | break; |
553 | } |
554 | default: |
555 | throw NotImplementedException("Unsupported precision for Interval/Duration Type " ); |
556 | } |
557 | break; |
558 | } |
559 | case LogicalTypeId::DECIMAL: { |
560 | auto val_mask = FlatVector::Validity(vector); |
561 | //! We have to convert from INT128 |
562 | auto src_ptr = ArrowBufferData<hugeint_t>(array, buffer_idx: 1) + scan_state.chunk_offset + array.offset; |
563 | if (nested_offset != -1) { |
564 | src_ptr = ArrowBufferData<hugeint_t>(array, buffer_idx: 1) + nested_offset + array.offset; |
565 | } |
566 | switch (vector.GetType().InternalType()) { |
567 | case PhysicalType::INT16: { |
568 | auto tgt_ptr = FlatVector::GetData<int16_t>(vector); |
569 | for (idx_t row = 0; row < size; row++) { |
570 | if (val_mask.RowIsValid(row_idx: row)) { |
571 | auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]); |
572 | D_ASSERT(result); |
573 | (void)result; |
574 | } |
575 | } |
576 | break; |
577 | } |
578 | case PhysicalType::INT32: { |
579 | auto tgt_ptr = FlatVector::GetData<int32_t>(vector); |
580 | for (idx_t row = 0; row < size; row++) { |
581 | if (val_mask.RowIsValid(row_idx: row)) { |
582 | auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]); |
583 | D_ASSERT(result); |
584 | (void)result; |
585 | } |
586 | } |
587 | break; |
588 | } |
589 | case PhysicalType::INT64: { |
590 | auto tgt_ptr = FlatVector::GetData<int64_t>(vector); |
591 | for (idx_t row = 0; row < size; row++) { |
592 | if (val_mask.RowIsValid(row_idx: row)) { |
593 | auto result = Hugeint::TryCast(input: src_ptr[row], result&: tgt_ptr[row]); |
594 | D_ASSERT(result); |
595 | (void)result; |
596 | } |
597 | } |
598 | break; |
599 | } |
600 | case PhysicalType::INT128: { |
601 | FlatVector::SetData(vector, |
602 | data: ArrowBufferData<data_t>(array, buffer_idx: 1) + GetTypeIdSize(type: vector.GetType().InternalType()) * |
603 | (scan_state.chunk_offset + array.offset)); |
604 | break; |
605 | } |
606 | default: |
607 | throw NotImplementedException("Unsupported physical type for Decimal: %s" , |
608 | TypeIdToString(type: vector.GetType().InternalType())); |
609 | } |
610 | break; |
611 | } |
612 | case LogicalTypeId::BLOB: { |
613 | ArrowToDuckDBBlob(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx, |
614 | nested_offset); |
615 | break; |
616 | } |
617 | case LogicalTypeId::LIST: { |
618 | ArrowToDuckDBList(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx, |
619 | nested_offset, parent_mask); |
620 | break; |
621 | } |
622 | case LogicalTypeId::MAP: { |
623 | ArrowToDuckDBList(vector, array, scan_state, size, arrow_convert_data, col_idx, arrow_convert_idx, |
624 | nested_offset, parent_mask); |
625 | ArrowToDuckDBMapVerify(vector, count: size); |
626 | break; |
627 | } |
628 | case LogicalTypeId::STRUCT: { |
629 | //! Fill the children |
630 | auto &child_entries = StructVector::GetEntries(vector); |
631 | auto &struct_validity_mask = FlatVector::Validity(vector); |
632 | for (idx_t type_idx = 0; type_idx < (idx_t)array.n_children; type_idx++) { |
633 | SetValidityMask(vector&: *child_entries[type_idx], array&: *array.children[type_idx], scan_state, size, nested_offset); |
634 | if (!struct_validity_mask.AllValid()) { |
635 | auto &child_validity_mark = FlatVector::Validity(vector&: *child_entries[type_idx]); |
636 | for (idx_t i = 0; i < size; i++) { |
637 | if (!struct_validity_mask.RowIsValid(row_idx: i)) { |
638 | child_validity_mark.SetInvalid(i); |
639 | } |
640 | } |
641 | } |
642 | ColumnArrowToDuckDB(vector&: *child_entries[type_idx], array&: *array.children[type_idx], scan_state, size, |
643 | arrow_convert_data, col_idx, arrow_convert_idx, nested_offset, parent_mask: &struct_validity_mask); |
644 | } |
645 | break; |
646 | } |
647 | default: |
648 | throw NotImplementedException("Unsupported type %s" , vector.GetType().ToString()); |
649 | } |
650 | } |
651 | |
652 | template <class T> |
653 | static void SetSelectionVectorLoop(SelectionVector &sel, data_ptr_t indices_p, idx_t size) { |
654 | auto indices = reinterpret_cast<T *>(indices_p); |
655 | for (idx_t row = 0; row < size; row++) { |
656 | sel.set_index(idx: row, loc: indices[row]); |
657 | } |
658 | } |
659 | |
660 | template <class T> |
661 | static void SetSelectionVectorLoopWithChecks(SelectionVector &sel, data_ptr_t indices_p, idx_t size) { |
662 | |
663 | auto indices = reinterpret_cast<T *>(indices_p); |
664 | for (idx_t row = 0; row < size; row++) { |
665 | if (indices[row] > NumericLimits<uint32_t>::Maximum()) { |
666 | throw ConversionException("DuckDB only supports indices that fit on an uint32" ); |
667 | } |
668 | sel.set_index(idx: row, loc: indices[row]); |
669 | } |
670 | } |
671 | |
672 | template <class T> |
673 | static void SetMaskedSelectionVectorLoop(SelectionVector &sel, data_ptr_t indices_p, idx_t size, ValidityMask &mask, |
674 | idx_t last_element_pos) { |
675 | auto indices = reinterpret_cast<T *>(indices_p); |
676 | for (idx_t row = 0; row < size; row++) { |
677 | if (mask.RowIsValid(row_idx: row)) { |
678 | sel.set_index(idx: row, loc: indices[row]); |
679 | } else { |
680 | //! Need to point out to last element |
681 | sel.set_index(idx: row, loc: last_element_pos); |
682 | } |
683 | } |
684 | } |
685 | |
686 | static void SetSelectionVector(SelectionVector &sel, data_ptr_t indices_p, LogicalType &logical_type, idx_t size, |
687 | ValidityMask *mask = nullptr, idx_t last_element_pos = 0) { |
688 | sel.Initialize(count: size); |
689 | |
690 | if (mask) { |
691 | switch (logical_type.id()) { |
692 | case LogicalTypeId::UTINYINT: |
693 | SetMaskedSelectionVectorLoop<uint8_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
694 | break; |
695 | case LogicalTypeId::TINYINT: |
696 | SetMaskedSelectionVectorLoop<int8_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
697 | break; |
698 | case LogicalTypeId::USMALLINT: |
699 | SetMaskedSelectionVectorLoop<uint16_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
700 | break; |
701 | case LogicalTypeId::SMALLINT: |
702 | SetMaskedSelectionVectorLoop<int16_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
703 | break; |
704 | case LogicalTypeId::UINTEGER: |
705 | if (last_element_pos > NumericLimits<uint32_t>::Maximum()) { |
706 | //! Its guaranteed that our indices will point to the last element, so just throw an error |
707 | throw ConversionException("DuckDB only supports indices that fit on an uint32" ); |
708 | } |
709 | SetMaskedSelectionVectorLoop<uint32_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
710 | break; |
711 | case LogicalTypeId::INTEGER: |
712 | SetMaskedSelectionVectorLoop<int32_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
713 | break; |
714 | case LogicalTypeId::UBIGINT: |
715 | if (last_element_pos > NumericLimits<uint32_t>::Maximum()) { |
716 | //! Its guaranteed that our indices will point to the last element, so just throw an error |
717 | throw ConversionException("DuckDB only supports indices that fit on an uint32" ); |
718 | } |
719 | SetMaskedSelectionVectorLoop<uint64_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
720 | break; |
721 | case LogicalTypeId::BIGINT: |
722 | if (last_element_pos > NumericLimits<uint32_t>::Maximum()) { |
723 | //! Its guaranteed that our indices will point to the last element, so just throw an error |
724 | throw ConversionException("DuckDB only supports indices that fit on an uint32" ); |
725 | } |
726 | SetMaskedSelectionVectorLoop<int64_t>(sel, indices_p, size, mask&: *mask, last_element_pos); |
727 | break; |
728 | |
729 | default: |
730 | throw NotImplementedException("(Arrow) Unsupported type for selection vectors %s" , logical_type.ToString()); |
731 | } |
732 | |
733 | } else { |
734 | switch (logical_type.id()) { |
735 | case LogicalTypeId::UTINYINT: |
736 | SetSelectionVectorLoop<uint8_t>(sel, indices_p, size); |
737 | break; |
738 | case LogicalTypeId::TINYINT: |
739 | SetSelectionVectorLoop<int8_t>(sel, indices_p, size); |
740 | break; |
741 | case LogicalTypeId::USMALLINT: |
742 | SetSelectionVectorLoop<uint16_t>(sel, indices_p, size); |
743 | break; |
744 | case LogicalTypeId::SMALLINT: |
745 | SetSelectionVectorLoop<int16_t>(sel, indices_p, size); |
746 | break; |
747 | case LogicalTypeId::UINTEGER: |
748 | SetSelectionVectorLoop<uint32_t>(sel, indices_p, size); |
749 | break; |
750 | case LogicalTypeId::INTEGER: |
751 | SetSelectionVectorLoop<int32_t>(sel, indices_p, size); |
752 | break; |
753 | case LogicalTypeId::UBIGINT: |
754 | if (last_element_pos > NumericLimits<uint32_t>::Maximum()) { |
755 | //! We need to check if our indexes fit in a uint32_t |
756 | SetSelectionVectorLoopWithChecks<uint64_t>(sel, indices_p, size); |
757 | } else { |
758 | SetSelectionVectorLoop<uint64_t>(sel, indices_p, size); |
759 | } |
760 | break; |
761 | case LogicalTypeId::BIGINT: |
762 | if (last_element_pos > NumericLimits<uint32_t>::Maximum()) { |
763 | //! We need to check if our indexes fit in a uint32_t |
764 | SetSelectionVectorLoopWithChecks<int64_t>(sel, indices_p, size); |
765 | } else { |
766 | SetSelectionVectorLoop<int64_t>(sel, indices_p, size); |
767 | } |
768 | break; |
769 | default: |
770 | throw ConversionException("(Arrow) Unsupported type for selection vectors %s" , logical_type.ToString()); |
771 | } |
772 | } |
773 | } |
774 | |
775 | static void ColumnArrowToDuckDBDictionary(Vector &vector, ArrowArray &array, ArrowScanLocalState &scan_state, |
776 | idx_t size, |
777 | std::unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
778 | idx_t col_idx, ArrowConvertDataIndices &arrow_convert_idx) { |
779 | SelectionVector sel; |
780 | auto &dict_vectors = scan_state.arrow_dictionary_vectors; |
781 | if (!dict_vectors.count(x: col_idx)) { |
782 | //! We need to set the dictionary data for this column |
783 | auto base_vector = make_uniq<Vector>(args: vector.GetType(), args&: array.dictionary->length); |
784 | SetValidityMask(vector&: *base_vector, array&: *array.dictionary, scan_state, size: array.dictionary->length, nested_offset: 0, add_null: array.null_count > 0); |
785 | ColumnArrowToDuckDB(vector&: *base_vector, array&: *array.dictionary, scan_state, size: array.dictionary->length, arrow_convert_data, |
786 | col_idx, arrow_convert_idx); |
787 | dict_vectors[col_idx] = std::move(base_vector); |
788 | } |
789 | auto dictionary_type = arrow_convert_data[col_idx]->dictionary_type; |
790 | //! Get Pointer to Indices of Dictionary |
791 | auto indices = ArrowBufferData<data_t>(array, buffer_idx: 1) + |
792 | GetTypeIdSize(type: dictionary_type.InternalType()) * (scan_state.chunk_offset + array.offset); |
793 | if (array.null_count > 0) { |
794 | ValidityMask indices_validity; |
795 | GetValidityMask(mask&: indices_validity, array, scan_state, size); |
796 | SetSelectionVector(sel, indices_p: indices, logical_type&: dictionary_type, size, mask: &indices_validity, last_element_pos: array.dictionary->length); |
797 | } else { |
798 | SetSelectionVector(sel, indices_p: indices, logical_type&: dictionary_type, size); |
799 | } |
800 | vector.Slice(other&: *dict_vectors[col_idx], sel, count: size); |
801 | } |
802 | |
803 | void ArrowTableFunction::ArrowToDuckDB(ArrowScanLocalState &scan_state, |
804 | unordered_map<idx_t, unique_ptr<ArrowConvertData>> &arrow_convert_data, |
805 | DataChunk &output, idx_t start, bool arrow_scan_is_projected) { |
806 | for (idx_t idx = 0; idx < output.ColumnCount(); idx++) { |
807 | auto col_idx = scan_state.column_ids[idx]; |
808 | |
809 | // If projection was not pushed down into the arrow scanner, but projection pushdown is enabled on the |
810 | // table function, we need to use original column ids here. |
811 | auto arrow_array_idx = arrow_scan_is_projected ? idx : col_idx; |
812 | |
813 | if (col_idx == COLUMN_IDENTIFIER_ROW_ID) { |
814 | // This column is skipped by the projection pushdown |
815 | continue; |
816 | } |
817 | |
818 | ArrowConvertDataIndices arrow_convert_idx {.variable_sized_index: 0, .datetime_precision_index: 0}; |
819 | auto &array = *scan_state.chunk->arrow_array.children[arrow_array_idx]; |
820 | if (!array.release) { |
821 | throw InvalidInputException("arrow_scan: released array passed" ); |
822 | } |
823 | if (array.length != scan_state.chunk->arrow_array.length) { |
824 | throw InvalidInputException("arrow_scan: array length mismatch" ); |
825 | } |
826 | // Make sure this Vector keeps the Arrow chunk alive in case we can zero-copy the data |
827 | output.data[idx].GetBuffer()->SetAuxiliaryData(make_uniq<ArrowAuxiliaryData>(args&: scan_state.chunk)); |
828 | if (array.dictionary) { |
829 | ColumnArrowToDuckDBDictionary(vector&: output.data[idx], array, scan_state, size: output.size(), arrow_convert_data, |
830 | col_idx, arrow_convert_idx); |
831 | } else { |
832 | SetValidityMask(vector&: output.data[idx], array, scan_state, size: output.size(), nested_offset: -1); |
833 | ColumnArrowToDuckDB(vector&: output.data[idx], array, scan_state, size: output.size(), arrow_convert_data, col_idx, |
834 | arrow_convert_idx); |
835 | } |
836 | } |
837 | } |
838 | |
839 | } // namespace duckdb |
840 | |